大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上(示例代码
Posted chenmingjun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上(示例代码相关的知识,希望对你有一定的参考价值。
十一、Nginx11.1、介绍11.2、常见其他 Web 服务器11.3、版本11.4、Nginx 安装11.5、目录结构11.6、操作命令十二、Mysql12.1、介绍12.2、关系型数据库(SQL)种类12.3、特征12.4、术语12.4、与非关系型数据库比较(Not Only SQL)12.4.1、种类12.4.2、特征12.4.3、总结十三、数据收集13.1、收集方式13.2、数据的事件类型13.2.1、Launch 事件13.2.2、PageView 事件13.3、Nginx 日志收集服务器13.3.1、字段信息13.3.2、Nginx 日志格式13.3.3、Nginx 配置13.4、Flume 数据采集13.4.1、编写 Flume 脚本上传日志文件到 HDFS13.4.2、将 core-site.xml 和 hdfs-site.xml 文件软连接到 flume 的 conf 文件夹中13.5、启动 Flume 采集测试十四、Web 工程 JS/JAVA SDK 讲解十五、Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上15.1、先分割 Nginx 日志,按天保存15.1.1、脚本编写15.1.2、其他环境准备15.1.2、定时任务15.2、再手动上传 Nginx 日志文件到 HDFS 上15.2.1、脚本编写15.2.2、测试脚本
十一、Nginx
11.1、介绍
nginx 一个高性能的 web 服务器,对于静态资源的访问速度特别快,单台机器能够支持 50000+ 并发服务,占用内存比较少、速度快、结构扩展容易;主要用于数据分流、服务器备份、静态资源的访问。
11.2、常见其他 Web 服务器
- 1、tomcat: 最常用的 Web 服务器
- 2、jetty: 内嵌数据库最常用的 Web 服务器
- 3、netty: 内嵌数据库常用的 Web 服务器
- 4、IIS: windows 集成的 Web 服务器
11.3、版本
http://tengine.taobao.org/tengine-2.0.2
Tengine 是由淘宝网发起的 Web 服务器项目。它在 Nginx 的基础上,针对大访问量网站的需求,添加了很多高级功能和特性。Tengine 的性能和稳定性已经在大型的网站如淘宝网、天猫商城等得到了很好的检验。它的最终目标是打造一个高效、稳定、安全、易用的 Web 平台。
从 2011 年 12 月开始,Tengine 成为一个开源项目,Tengine 团队在积极地开发和维护着它。Tengine 团队的核心成员来自于淘宝、搜狗等互联网企业。Tengine 是社区合作的成果,我们欢迎大家参与其中,贡献自己的力量。
11.4、Nginx 安装
Step1、在 module 目录下创建 nginx 目录
$ mkdir /opt/module/nginx
Step2、下载安装包
# wget http://tengine.taobao.org/download/tengine-2.0.2.tar.gz
Step3、解压
$ tar -zxf tengine-2.0.2.tar.gz -C /opt/module/nginx/
Step4、安装依赖服务(注意:需要使用root用户进行安装)
# yum -y install gcc openssl-devel zlib-devel pcre-devel
Step5、部署安装(注意:需要使用root用户进行安装)
[[email protected] tengine-2.0.2]# pwd
/opt/module/nginx/tengine-2.0.2
[[email protected] tengine-2.0.2]# ./configure
[[email protected] tengine-2.0.2]# make && make install
[[email protected] tengine-2.0.2]# cd /usr/local/nginx/(查看安装是否成功)
11.5、目录结构
- 安装目录:/usr/local/nginx/
- 操作命令目录:/usr/local/nginx/sbin
- 配置文件目录:/usr/local/nginx/conf
- 默认的页面存储目录:/usr/local/nginx/html
11.6、操作命令
* 启动 Nginx
# sbin/nginx
* 查看 Nginx 进程是否存在
# ps -ef | grep nginx
* 查看 80 端口是否在使用
# netstat -tunlp | grep 80
* 关闭服务
# sbin/nginx -s stop
* 重新加载配置项
# sbin/nginx -s reload
* 校验 conf 文件夹中 nginx.conf 文件格式是否正确
# sbin/nginx -t
* 帮助命令
# sbin/nginx -h
尖叫提示1
:如果出现如下错误,请下载对应依赖包
./configure: error: C compiler cc is not found
./configure: error: the HTTP rewrite module requires the PCRE library解决方案
:yum install gcc 和 yum install pcre-devel
尖叫提示2
:通过浏览器测试访问:http://192.168.25.102/,发现出现中文乱码
解决方案
:最新版本的Chrome浏览器如何设置网页编码 问题解决成功后的截图
十二、Mysql
12.1、介绍
一个非常常用的关系型数据库(RDBMS)。
常用的引擎:
MyISAM:mysql5.0 之前默认引擎,插入/查询速度快,体积比较小,不支持事务。
InnoDB:MySQL5.x 之后默认引擎,支持ACID事务,支持行级别的锁机制。
12.2、关系型数据库(SQL)种类
- MySQL: 体积小、开源
- Oracle: 功能强大、技术支持力度大
- SQLServer: Windows 平台下的数据库
- PostgreSQL: 和 MySQL 类似,linux 平台下稳定性强,开源,经常用于默认数据库
- Derby: java 编写的基于内存的数据,体积小,支持两种模式:内嵌+服务器模式,经常用于内嵌数据库
12.3、特征
- 支持 Sql 查询
- 支持 ACID 特性
- 数据有严格的 schema(模式)
- 数据以表格的形式呈现
- 以行和列构成数据
- 行表示是事件的具体值
- 列表示的是对于值的数据域
- 若干行构成数据表
- 若干表构成数据库(database)
12.4、术语
- 数据库
- 数据表
- 列
- 行
- 冗余
- 主键
- 外键
- 索引: 唯一索引、复合索引等等
12.4、与非关系型数据库比较(Not Only SQL)
12.4.1、种类
- 键值对数据库:Redis 等,数据不是结构化,存储是一个 key 对应一个 value,查询也是如此,一般基于内存,速度比较快,常用于缓存
- 列式存储数据库:HBase、Cassandra 等
- 文档型数据库::MongoDB 等
- 图数据库:Neo4j 等,常见应用:知识图谱
12.4.2、特征
- 不支持 sql 查询,每个数据库都有自己查询操作 api
- 不支持事务,保证数据的最终一致性
- 不需要预定义模式,模式自由(分布式的、扩充简单)
- 异步复制:基于操作日志进行数据复制
- 访问速度较快(特定的场景)
12.4.3、总结
关系型数据库和 NoSQL 整合使用,一般使用 NoSQL 作为部分场景的数据存储,使用 Mysql 作为持久化的存储,例如:mysql+redis ==> redis(做缓存)
十三、数据收集
13.1、收集方式
13.2、数据的事件类型
13.2.1、Launch 事件
Launch 事件主要就是表示用户(访客)第一次到网站的事件类型,主要应用于计算新用户等类似任务的计算。
参数名 | 说明 |
---|---|
en | 事件名称,launch 事件中为:e_l |
ver | 版本号 |
pl | 平台名称,launch 事件中为:website |
sdk | sdk 版本号,website 平台中为:js |
u_ud | 用户 id,唯一标识访客(用户) |
u_mid | 会员 id,业务系统的用户 id |
u_sd | 会话 id,标识会话 id |
c_time | 客户端时间 |
l | 平台语言,window.navigator.language |
b_iev | 浏览器信息,window.navigator.userAgent |
b_rst | 浏览器屏幕大小,screen.width + "*" + screen.height |
13.2.2、PageView 事件
PageView 事件是 pc 端的基本事件类型,主要是描述用户访问网站信息,应用于基本的各个不同计算任务。
参数名 | 说明 |
---|---|
en | 事件名称,pageview 事件中为:e_pv |
p_url | 当前页面的 url |
p_ref | 前一个页面的 url,如果没有前一个页面,那么值为空 |
tt | 当前页面的标题 |
13.2.3、Event 事件
event 事件是专门记录用户对于某些特定事件/活动的触发行为,主要是用于计算各活动的活跃用户以及各个不同访问链路的转化率情况等任务。
参数名 | 说明 |
---|---|
en | 事件名称,event 事件中为:e_e |
ca | 事件的 category 值,即事件的种类名称,不为空 |
ac | 事件的 action 值,即事件的活动名称,不为空 |
du | 事件持续时间,可以为空 |
kv_ | 事件自定义属性键值对。比如 kv_keyname=value,这里的 keyname 和 value 就是用户自定义,支持在事件上定义多个属性键值对 |
13.2.4、ChargeRequest 事件
该事件的主要作用是记录用户产生订单的行为/数据,为统计计算订单相关的统计结果提供基础数据。
参数名 | 说明 |
---|---|
en | 事件名称,event事件为:e_crt |
oid | 订单 id(order id) |
on | 订单名称(order name) |
cua | 订单金额(currency amount) |
cut | 订单支付货币类型(currency type) |
pt | 订单支付方式(payment type) |
13.2.5、ChargeSuccess 事件
该事件的主要作用是记录用户在产生订单后,支付订单的行为。为统计订单转化率提供基础数据的支持。
参数名 | 说明 |
---|---|
u_mid | 会员 id,业务系统的用户 id |
en | 事件名称,ChargeSuccess 事件中为:e_cs |
oid | 订单 id(order id) |
c_time | 客户端时间 |
ver | 版本信息 |
pl | 平台名称,后台平台名称为:java_server |
sdk | sdk 名称,后台平台名称为:jdk |
13.2.6、ChargeRefund 事件
该事件的主要作用是记录用户对应订单退款的相关行为。为统计订单退款率提供基础数据的支持。
参数名 | 说明 |
---|---|
en | 事件名称,ChargeRefund 事件中为:e_cr |
u_mid | 会员 id,业务系统的用户 id |
oid | 订单 id(order id) |
c_time | 客户端时间 |
ver | 版本信息 |
pl | 平台名称,后台平台名称为:java_server |
sdk | sdk 名称,后台平台名称为:jdk |
13.3、Nginx 日志收集服务器
13.3.1、字段信息
字段 | 字段说明 |
---|---|
IP 地址 | 客户端的 IP 地址 |
服务器时间 | 访问服务器的时间(防止客户端时间发生异常) |
浏览器是否支持 Flash | 浏览器是否支持 Flash |
浏览器信息 | 浏览器类型等等 |
客户端时间 | 访问浏览器的时候,方便进行缓存,例如:当 url 没有变动时,浏览器将缓存,新的请求不会发送给服务器 |
13.3.2、Nginx 日志格式
- 分割字段:^A
- 格式举例:IP地址^A服务器时间^A请求参数
13.3.3、Nginx 配置
参考文档:
http://tengine.taobao.org/document/http_upstream_dynamic.html
注意
:Step1~Step7 操作均以 root 用户身份操作!
Step1、先备份 conf 目录下的 nginx.conf
# cp nginx.conf nginx.conf.bak
Step2、配置 nginx.conf 如下
# http 标签中添加如下
# 定义日志格式
log_format user_log_format ‘$remote_addr^A$msec^A$request_uri‘;
# server 标签中添加如下
server_name hadoop102;
# server 标签中添加 location 标签如下
# 新增一个 location,匹配所有以 what.png 结尾的请求
location ~ .*(what).(png)$ {
# 设置请求类型为图片请求
default_type image/png;
# 记录日志,存储到一个 flume 用户可以读取的文件夹中,需要修改权限
access_log /usr/local/nginx/user_logs/access.log user_log_format;
# 给定存储图片的服务器位置
root /usr/local/nginx/html;
}
Step3、修改 nginx.conf 文件的用户以及用户组为 root:root
# chown root:root nginx.conf
Step4、修改 nginx.conf 权限为 644,即 rw-r—r—
# chmod 644 nginx.conf
Step5、创建存储用户日志的文件夹 /usr/local/nginx/user_logs,并将其权限修改为 777
# mkdir user_logs/
# chmod 777 user_logs/
Step6、将 what.png 图片文件移动到 /usr/local/nginx/html 文件夹中,并修改用户及权限和该文件夹中的其他文件一样
# cp /opt/software/what.png /usr/local/nginx/html/
Step7、重启 Nginx 服务
方式一:
# cat /usr/local/nginx/logs/nginx.pid
# kill -9 xxx
方式二:
# ps -ef | grep nginx
# kill -9 xxx
方式三:
# sbin/nginx -s stop
重启:
# sbin/nginx
Step8、进行网页刷新测试
Step9、观察 /usr/local/nginx/user_logs/access_log 下是否产生日志
当地址变为:http://192.168.25.102/what.png?money=1000 时,查看日志内容如下:
说明我们可以获取到网页的访问日志了!
13.4、Flume 数据采集
13.4.1、编写 Flume 脚本上传日志文件到 HDFS
flume-nginx-log-2-hdfs.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /usr/local/nginx/user_logs/access.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/event-logs/%Y/%m/%d
a1.sinks.k1.hdfs.filePrefix=FlumeData
a1.sinks.k1.hdfs.fileSuffix=.log
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a1.sinks.k1.hdfs.batchSize = 10000
#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 0
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 131072000
#文件的滚动与 Event 数量无关
a1.sinks.k1.hdfs.rollCount = 0
#当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件
a1.sinks.k1.hdfs.idleTimeout = 60
#最小冗余数
a1.sinks.k1.hdfs.minBlockReplicas = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# 一般设置为 2 * 1024 * 1024 * 100 = 209715200
a1.channels.c1.capacity = 5120000
# 单个进程的最大处理能力
a1.channels.c1.transactionCapacity = 512000
a1.channels.c1.keep-alive=60
a1.channels.c1.byteCapacityBufferPercentage=10
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
13.4.2、将 core-site.xml 和 hdfs-site.xml 文件软连接到 flume 的 conf 文件夹中
$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml /opt/module/flume/conf/
$ ln -s /opt/module/hadoop-2.7.2/etc/hadoop/core-site.xml /opt/module/flume/conf/
13.5、启动 Flume 采集测试
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-nginx-log-2-hdfs.conf -Dflume.root.logger=INFO,console /opt/module/flume/logs/flume-nginx-log-2-hdfs.log 2>&1 &
十四、Web 工程 JS/JAVA SDK 讲解
准备工作:在 Linux 上安装 Eclipse
参考链接:https://www.cnblogs.com/chenmingjun/p/10712180.html
Web 工程目录结构
AnalyticsEngineSDK.java
package com.z.logmake;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 分析引擎sdk java服务器端数据收集
*/
public class AnalyticsEngineSDK {
// 日志打印对象
private static final Logger log = Logger.getGlobal();
// 请求url的主体部分
public static final String accessUrl = "http://hadoop-senior01.itguigu.com/what.png";
private static final String platformName = "java_server";
private static final String sdkName = "jdk";
private static final String version = "1";
/**
* 触发订单支付成功事件,发送事件数据到服务器
*
* @param orderId
* 订单支付id
* @param memberId
* 订单支付会员id
* @return 如果发送数据成功(加入到发送队列中),那么返回true;否则返回false(参数异常&添加到发送队列失败).
*/
public static boolean onChargeSuccess(String orderId, String memberId) {
try {
if (isEmpty(orderId) || isEmpty(memberId)) {
// 订单id或者memberid为空
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
}
// 代码执行到这儿,表示订单id和会员id都不为空。
Map<String, String> data = new HashMap<String, String>();
data.put("u_mid", memberId);
data.put("oid", orderId);
data.put("c_time", String.valueOf(System.currentTimeMillis()));
data.put("ver", version);
data.put("en", "e_cs");
data.put("pl", platformName);
data.put("sdk", sdkName);
// 创建url
String url = buildUrl(data);
// 发送url&将url加入到队列
SendDataMonitor.addSendUrl(url);
return true;
} catch (Throwable e) {
log.log(Level.WARNING, "发送数据异常", e);
}
return false;
}
/**
* 触发订单退款事件,发送退款数据到服务器
*
* @param orderId
* 退款订单id
* @param memberId
* 退款会员id
* @return 如果发送数据成功,返回true。否则返回false。
*/
public static boolean onChargeRefund(String orderId, String memberId) {
try {
if (isEmpty(orderId) || isEmpty(memberId)) {
// 订单id或者memberid为空
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
}
// 代码执行到这儿,表示订单id和会员id都不为空。
Map<String, String> data = new HashMap<String, String>();
data.put("u_mid", memberId);
data.put("oid", orderId);
data.put("c_time", String.valueOf(System.currentTimeMillis()));
data.put("ver", version);
data.put("en", "e_cr");
data.put("pl", platformName);
data.put("sdk", sdkName);
// 构建url
String url = buildUrl(data);
// 发送url&将url添加到队列中
SendDataMonitor.addSendUrl(url);
return true;
} catch (Throwable e) {
log.log(Level.WARNING, "发送数据异常", e);
}
return false;
}
/**
* 根据传入的参数构建url
*
* @param data
* @return
* @throws UnsupportedEncodingException
*/
private static String buildUrl(Map<String, String> data) throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
sb.append(accessUrl).append("?");
for (Map.Entry<String, String> entry : data.entrySet()) {
if (isNotEmpty(entry.getKey()) && isNotEmpty(entry.getValue())) {
// key和value不为空
sb.append(entry.getKey().trim()).append("=").append(URLEncoder.encode(entry.getValue().trim(), "utf-8"))
.append("&");
// 解码
// URLDecoder.decode("需要解码的内容", "utf-8");
}
}
return sb.substring(0, sb.length() - 1);// 去掉最后&
}
/**
* 判断字符串是否为空,如果为空,返回true。否则返回false。
*
* @param value
* @return
*/
private static boolean isEmpty(String value) {
return value == null || value.trim().isEmpty();
}
/**
* 判断字符串是否非空,如果不是空,返回true。如果是空,返回false。
*
* @param value
* @return
*/
private static boolean isNotEmpty(String value) {
return !isEmpty(value);
}
}
SendDataMonitor.java
package com.z.logmake;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 发送url数据的监控者,用于启动一个单独的线程来发送数据
*/
public class SendDataMonitor {
// 日志记录对象
private static final Logger log = Logger.getGlobal();
// 队列,用户存储发送url, 并发控制的Int.maxSize大小的阻塞队列
private BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
// 用于单列的一个类对象
private static SendDataMonitor monitor = null;
private SendDataMonitor() {
// 私有构造方法,进行单列模式的创建
}
/**
* 获取单例的monitor对象实例
*
* @return
*/
public static SendDataMonitor getSendDataMonitor() {
if (monitor == null) {
synchronized (SendDataMonitor.class) {
if (monitor == null) {
monitor = new SendDataMonitor();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 线程中调用具体的处理方法
SendDataMonitor.monitor.run();
}
});
// 测试的时候,不设置为守护模式
// thread.setDaemon(true);
thread.start();
}
}
}
return monitor;
}
/**
* 添加一个url到队列中去
*
* @param url
* @throws InterruptedException
*/
public static void addSendUrl(String url) throws InterruptedException {
getSendDataMonitor().queue.put(url);
}
/**
* 具体执行发送url的方法
*
*/
private void run() {
while (true) {
try {
// take 方法是阻塞方法,队列上有数据则取出,队列上没有数据则等待
String url = this.queue.take();
// 正式的发送url
HttpRequestUtil.sendData(url);
} catch (Throwable e) {
log.log(Level.WARNING, "发送url异常", e);
}
}
}
/**
* 内部类,用户发送数据的http工具类
*/
public static class HttpRequestUtil {
/**
* 具体发送url的方法
*
* @param url
* @throws IOException
*/
public static void sendData(String url) throws IOException {
HttpURLConnection con = null;
BufferedReader in = null;
try {
URL obj = new URL(url); // 创建url对象
con = (HttpURLConnection) obj.openConnection(); // 打开url连接
// 设置连接参数
con.setConnectTimeout(5000); // 连接过期时间
con.setReadTimeout(5000); // 读取数据过期时间
con.setRequestMethod("GET"); // 设置请求类型为get
System.out.println("发送url:" + url);
// 发送连接请求
in = new BufferedReader(new InputStreamReader(con.getInputStream()));
// TODO: 这里考虑是否可以
} finally {
try {
if (in != null) {
in.close();
}
} catch (Throwable e) {
// nothing
}
try {
con.disconnect();
} catch (Throwable e) {
// nothing
}
}
}
}
}
analytics.js
(function() {
var CookieUtil = {
// get the cookie of the key is name
get : function(name) {
var cookieName = encodeURIComponent(name) + "=", cookieStart = document.cookie
.indexOf(cookieName), cookieValue = null;
if (cookieStart > -1) {
var cookieEnd = document.cookie.indexOf(";", cookieStart);
if (cookieEnd == -1) {
cookieEnd = document.cookie.length;
}
cookieValue = decodeURIComponent(document.cookie.substring(
cookieStart + cookieName.length, cookieEnd));
}
return cookieValue;
},
// set the name/value pair to browser cookie
set : function(name, value, expires, path, domain, secure) {
var cookieText = encodeURIComponent(name) + "="
+ encodeURIComponent(value);
if (expires) {
// set the expires time
var expiresTime = new Date();
expiresTime.setTime(expires);
cookieText += ";expires=" + expiresTime.toGMTString();
}
if (path) {
cookieText += ";path=" + path;
}
if (domain) {
cookieText += ";domain=" + domain;
}
if (secure) {
cookieText += ";secure";
}
document.cookie = cookieText;
},
setExt : function(name, value) {
this.set(name, value, new Date().getTime() + 315360000000, "/");
}
};
// 主体,其实就是tracker js
var tracker = {
// config
clientConfig : {
// TODO 这里的url需要传入具体的地址
serverUrl : "http://hadoop-senior01.itguigu.com/what.png",
sessionTimeout : 360, // 360s -> 6min
maxWaitTime : 3600, // 3600s -> 60min -> 1h
ver : "1"
},
cookieExpiresTime : 315360000000, // cookie过期时间,10年
columns : {
// 发送到服务器的列名称
eventName : "en",
version : "ver",
platform : "pl",
sdk : "sdk",
uuid : "u_ud",
memberId : "u_mid",
sessionId : "u_sd",
clientTime : "c_time",
language : "l",
userAgent : "b_iev",
resolution : "b_rst",
currentUrl : "p_url",
referrerUrl : "p_ref",
title : "tt",
orderId : "oid",
orderName : "on",
currencyAmount : "cua",
currencyType : "cut",
paymentType : "pt",
category : "ca",
action : "ac",
kv : "kv_",
duration : "du"
},
keys : {
pageView : "e_pv",
chargeRequestEvent : "e_crt",
launch : "e_l",
eventDurationEvent : "e_e",
sid : "bftrack_sid",
uuid : "bftrack_uuid",
mid : "bftrack_mid",
preVisitTime : "bftrack_previsit",
},
/**
* 获取会话id
*/
getSid : function() {
return CookieUtil.get(this.keys.sid);
},
/**
* 保存会话id到cookie
*/
setSid : function(sid) {
if (sid) {
CookieUtil.setExt(this.keys.sid, sid);
}
},
/**
* 获取uuid,从cookie中
*/
getUuid : function() {
return CookieUtil.get(this.keys.uuid);
},
/**
* 保存uuid到cookie
*/
setUuid : function(uuid) {
if (uuid) {
CookieUtil.setExt(this.keys.uuid, uuid);
}
},
/**
* 获取memberID
*/
getMemberId : function() {
return CookieUtil.get(this.keys.mid);
},
/**
* 设置mid
*/
setMemberId : function(mid) {
if (mid) {
CookieUtil.setExt(this.keys.mid, mid);
}
},
startSession : function() {
// 加载js就触发的方法
if (this.getSid()) {
// 会话id存在,表示uuid也存在
if (this.isSessionTimeout()) {
// 会话过期,产生新的会话
this.createNewSession();
} else {
// 会话没有过期,更新最近访问时间
this.updatePreVisitTime(new Date().getTime());
}
} else {
// 会话id不存在,表示uuid也不存在
this.createNewSession();
}
this.onPageView();
},
onLaunch : function() {
// 触发launch事件
var launch = {};
launch[this.columns.eventName] = this.keys.launch; // 设置事件名称
this.setCommonColumns(launch); // 设置公用columns
this.sendDataToServer(this.parseParam(launch)); // 最终发送编码后的数据
},
onPageView : function() {
// 触发page view事件
if (this.preCallApi()) {
var time = new Date().getTime();
var pageviewEvent = {};
pageviewEvent[this.columns.eventName] = this.keys.pageView;
pageviewEvent[this.columns.currentUrl] = window.location.href; // 设置当前url
pageviewEvent[this.columns.referrerUrl] = document.referrer; // 设置前一个页面的url
pageviewEvent[this.columns.title] = document.title; // 设置title
this.setCommonColumns(pageviewEvent); // 设置公用columns
this.sendDataToServer(this.parseParam(pageviewEvent)); // 最终发送编码后的数据
this.updatePreVisitTime(time); // 更新最近访问时间
}
},
onChargeRequest : function(orderId, name, currencyAmount, currencyType,
paymentType) {
// 触发订单产生事件
if (this.preCallApi()) {
if (!orderId || !currencyType || !paymentType) {
this.log("订单id、货币类型以及支付方式不能为空");
return;
}
if (typeof (currencyAmount) == "number") {
// 金额必须是数字
var time = new Date().getTime();
var chargeRequestEvent = {};
chargeRequestEvent[this.columns.eventName] = this.keys.chargeRequestEvent;
chargeRequestEvent[this.columns.orderId] = orderId;
chargeRequestEvent[this.columns.orderName] = name;
chargeRequestEvent[this.columns.currencyAmount] = currencyAmount;
chargeRequestEvent[this.columns.currencyType] = currencyType;
chargeRequestEvent[this.columns.paymentType] = paymentType;
this.setCommonColumns(chargeRequestEvent); // 设置公用columns
this.sendDataToServer(this.parseParam(chargeRequestEvent)); // 最终发送编码后的数据ss
this.updatePreVisitTime(time);
} else {
this.log("订单金额必须是数字");
return;
}
}
},
onEventDuration : function(category, action, map, duration) {
// 触发event事件
if (this.preCallApi()) {
if (category && action) {
var time = new Date().getTime();
var event = {};
event[this.columns.eventName] = this.keys.eventDurationEvent;
event[this.columns.category] = category;
event[this.columns.action] = action;
if (map) {
// map如果不为空,进行内容的添加
for ( var k in map) {
// 循环key
if (k && map[k]) {
// 当key和value不为空的时候,进行添加操作
event[this.columns.kv + k] = map[k]; // key添加前缀"kv_"
}
}
}
if (duration) {
event[this.columns.duration] = duration; // 当duration不为0的时候进行添加
}
this.setCommonColumns(event); // 设置公用columns
this.sendDataToServer(this.parseParam(event)); // 最终发送编码后的数据ss
this.updatePreVisitTime(time);
} else {
this.log("category和action不能为空");
}
}
},
/**
* 执行对外方法前必须执行的方法
*/
preCallApi : function() {
if (this.isSessionTimeout()) {
// 如果为true,表示需要新建
this.startSession();
} else {
this.updatePreVisitTime(new Date().getTime());
}
return true;
},
sendDataToServer : function(data) {
// 发送数据data到服务器,其中data是一个字符串
// TODO:发送以前发送失败的数据
var that = this;
var i2 = new Image(1, 1);
i2.onerror = function() {
// 这里可以进行重试操作
// 当请求失败的情况下,执行这块的代码,可以将数据保存到local stroage中,下次再重新发送数据
};
// 给定图片的请求url
i2.src = this.clientConfig.serverUrl + "?" + data;
},
/**
* 往data中添加发送到日志收集服务器的公用部分
*/
setCommonColumns : function(data) {
data[this.columns.version] = this.clientConfig.ver;
data[this.columns.platform] = "website";
data[this.columns.sdk] = "js";
data[this.columns.uuid] = this.getUuid(); // 设置用户id
data[this.columns.memberId] = this.getMemberId(); // 设置会员id
data[this.columns.sessionId] = this.getSid(); // 设置sid
data[this.columns.clientTime] = new Date().getTime(); // 设置客户端时间
data[this.columns.language] = window.navigator.language; // 设置浏览器语言
data[this.columns.userAgent] = window.navigator.userAgent; // 设置浏览器类型
data[this.columns.resolution] = screen.width + "*" + screen.height; // 设置浏览器分辨率
},
/**
* 创建新的会员,并判断是否是第一次访问页面,如果是,进行launch事件的发送。
*/
createNewSession : function() {
var time = new Date().getTime(); // 获取当前操作时间
// 1. 进行会话更新操作
var sid = this.generateId(); // 产生一个session id
this.setSid(sid);
this.updatePreVisitTime(time); // 更新最近访问时间
// 2. 进行uuid查看操作
if (!this.getUuid()) {
// uuid不存在,先创建uuid,然后保存到cookie,最后触发launch事件
var uuid = this.generateId(); // 产品uuid
this.setUuid(uuid);
this.onLaunch(); // 触发launch事件
}
},
/**
* 参数编码返回字符串
*/
parseParam : function(data) {
var params = "";
for ( var e in data) {
if (e && data[e]) {
// 对key和value进行编码操作
params += encodeURIComponent(e) + "="
+ encodeURIComponent(data[e]) + "&";
}
}
if (params) {
return params.substring(0, params.length - 1);
} else {
return params;
}
},
/**
* 产生uuid<br/>
* UUID的产生逻辑,可以参考Java中UUID的生产代码
*/
generateId : function() {
var chars = ‘0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz‘;
var tmpid = [];
var r;
tmpid[8] = tmpid[13] = tmpid[18] = tmpid[23] = ‘-‘;
tmpid[14] = ‘4‘;
for (i = 0; i < 36; i++) {
if (!tmpid[i]) {
r = 0 | Math.random() * 16;
tmpid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
}
}
return tmpid.join(‘‘);
},
/**
* 判断这个会话是否过期,查看当前时间和最近访问时间间隔时间是否小于this.clientConfig.sessionTimeout<br/>
* 如果是小于,返回false;否则返回true。
*/
isSessionTimeout : function() {
var time = new Date().getTime();
var preTime = CookieUtil.get(this.keys.preVisitTime);
if (preTime) {
// 最近访问时间存在,那么进行区间判断
return time - preTime > this.clientConfig.sessionTimeout * 1000;
}
return true;
},
/**
* 更新最近访问时间
*/
updatePreVisitTime : function(time) {
CookieUtil.setExt(this.keys.preVisitTime, time);
},
/**
* 打印日志
*/
log : function(msg) {
console.log(msg);
},
};
// 对外暴露的方法名称
window.__AE__ = {
startSession : function() {
tracker.startSession();
},
onPageView : function() {
tracker.onPageView();
},
onChargeRequest : function(orderId, name, currencyAmount, currencyType,
paymentType) {
tracker.onChargeRequest(orderId, name, currencyAmount,
currencyType, paymentType);
},
onEventDuration : function(category, action, map, duration) {
tracker.onEventDuration(category, action, map, duration);
},
setMemberId : function(mid) {
tracker.setMemberId(mid);
}
};
// 自动加载方法
var autoLoad = function() {
// 进行参数设置
var _aelog_ = _aelog_ || window._aelog_ || [];
var memberId = null;
for (i = 0; i < _aelog_.length; i++) {
_aelog_[i][0] === "memberId" && (memberId = _aelog_[i][1]);
}
// 根据是给定memberid,设置memberid的值
memberId && __AE__.setMemberId(memberId);
// 启动session
__AE__.startSession();
};
// 调用
autoLoad();
})();
demo.html
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>测试页面1</title>
<!-- 第一种集成方式 -->
<script type="text/javascript" src="./js/analytics.js"></script>
</head>
<body>
测试页面1<br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
demo2.html
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>测试页面2</title>
<script type="text/javascript" src="./js/analytics.js"></script>
</head>
<body>
测试页面2
<br/>
<label>orderid: 123456</label><br>
<label>orderName: 测试订单123456</label><br/>
<label>currencyAmount: 524.01</label><br/>
<label>currencyType: RMB</label><br/>
<label>paymentType: alipay</label><br/>
<button onclick="__AE__.onChargeRequest(‘123456‘,‘测试订单123456‘,524.01,‘RMB‘,‘alipay‘)">触发chargeRequest事件</button><br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
demo3.html
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>测试页面3</title>
<script type="text/javascript" src="./js/analytics.js"></script>
</head>
<body>
测试页面3<br/>
<label>category: event的category名称</label><br/>
<label>action: event的action名称</label><br/>
<label>map: {"key1":"value1", "key2":"value2"}</label><br/>
<label>duration: 1245</label><br/>
<button onclick="__AE__.onEventDuration(‘event的category名称‘,‘event的action名称‘, {‘key1‘:12,‘key2‘:‘value2‘}, 1245)">触发带map和duration的事件</button><br/>
<button onclick="__AE__.onEventDuration(‘event的category名称‘,‘event的action名称‘)">触发不带map和duration的事件</button><br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
demo4.html
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>测试页面4</title>
<!-- 第二种js集成方式 -->
<script type="text/javascript">
(function(){
var _aelog_ = _aelog_ || window._aelog_ || [];
// 设置_aelog_相关属性
_aelog_.push(["memberId","itguigu"]);
window._aelog_ = _aelog_; // 作用,传递参数
(function(){
var aejs = document.createElement(‘script‘);
aejs.type = ‘text/javascript‘;
aejs.async = true;
aejs.src = ‘./js/analytics.js‘; // 给定url
var script = document.getElementsByTagName(‘script‘)[0]; // 获取html页面中第一个script的标签
script.parentNode.insertBefore(aejs, script); // 插入新生成的script标签
})();
})();
</script>
</head>
<body>
测试页面4<br/>
在本页面设置memberid为itguigu<br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
demo5.html
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>测试页面5---产生事件数据</title>
<script type="text/javascript" src="./js/analytics.js"></script>
<script type="text/javascript">
var CookieUtil = {
// get the cookie of the key is name
get : function(name) {
var cookieName = encodeURIComponent(name) + "=", cookieStart = document.cookie
.indexOf(cookieName), cookieValue = null;
if (cookieStart > -1) {
var cookieEnd = document.cookie.indexOf(";", cookieStart);
if (cookieEnd == -1) {
cookieEnd = document.cookie.length;
}
cookieValue = decodeURIComponent(document.cookie.substring(
cookieStart + cookieName.length, cookieEnd));
}
return cookieValue;
},
// set the name/value pair to browser cookie
set : function(name, value, expires, path, domain, secure) {
var cookieText = encodeURIComponent(name) + "="
+ encodeURIComponent(value);
if (expires) {
// set the expires time
var expiresTime = new Date();
expiresTime.setTime(expires);
cookieText += ";expires=" + expiresTime.toGMTString();
}
if (path) {
cookieText += ";path=" + path;
}
if (domain) {
cookieText += ";domain=" + domain;
}
if (secure) {
cookieText += ";secure";
}
document.cookie = cookieText;
},
setExt : function(name, value) {
this.set(name, value, new Date().getTime() + 315360000000, "/");
}
};
function generate(count) {
for (var i=0;i<count;i++) {
// 50%的几率更改会话id
if (GetRandomNum() > 4) {
// 设置cookie:bftrack_previsit的值为一年前即可
CookieUtil.setExt("bftrack_previsit", 123);
}
// 搜索转换事件流
__AE__.onEventDuration("搜索事件流", "搜索");
__AE__.onEventDuration("搜索事件流", "浏览搜索页面");
if (GetRandomNum() > 2) {
// 70%会访问详情页面
__AE__.onEventDuration("搜索事件流", "点击查看详情页");
__AE__.onEventDuration("搜索事件流", "详情页面浏览事件");
if (GetRandomNum() > 1) {
// 80%产生多次访问点击课程详情情况,多次访问次数随机
for (var j=0;j<GetRandomNum();j++) {
// 可能产生多个点击课程详情面情况
__AE__.onEventDuration("搜索事件流", "点击查看详情页");
__AE__.onEventDuration("搜索事件流", "详情页面浏览事件");
// 多次访问点击课程详情,可能触发订单产生事件
if (GetRandomNum() > 8) {
// 10%会产生订单
__AE__.onEventDuration("搜索事件流", "点击立即购买事件");
}
}
}
if (GetRandomNum() > 6) {
// 30%会产生订单,也就是说0.7*0.3=21%的几率会产生订单
__AE__.onEventDuration("搜索事件流", "点击立即购买事件");
}
}
// 30%的几率更改会话id
if (GetRandomNum() > 6) {
// 设置cookie:bftrack_previsit的值为一年前即可
CookieUtil.setExt("bftrack_previsit", 123);
}
// 40%的几率直接产生详情页面浏览事件
if (GetRandomNum() > 5) {
// 30%产生订单事件
__AE__.onEventDuration("搜索事件流", "详情页面浏览事件");
if (GetRandomNum() > 4) {
__AE__.onEventDuration("搜索事件流", "点击立即购买事件");
}
}
}
};
function GetRandomNum() {
// 产生一个0到9的随机数
var Range = 9;
var Rand = Math.random();
return Math.round(Rand * Range);
}
</script>
</head>
<body>
<h1>测试页面:产生事件数据</h1>
<p>下述案例参考北风网的搜索</p>
<p>事件流只考虑在前端产生的事件,而且只有发生在同一个会话的事件才能算做事一个事件流。只产生一个事件流的数据:
<br/>搜索-->浏览搜索页面-->点击查看详情页-->详情页面浏览事件-->点击立即购买事件</p>
<br/>
<label>事件流数量:<input type="text" id="count"/></label>
<button id="btn" onclick="generate(document.getElementById(‘count‘).value)">触发chargeRequest事件</button><br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
index.html
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>Insert title here</title>
</head>
<body>
启动成功<br/>
跳转到:
<a href="demo.html">demo</a>
<a href="demo2.html">demo2</a>
<a href="demo3.html">demo3</a>
<a href="demo4.html">demo4</a>
</body>
</html>
analytics.js代码小图解
十五、Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上
如果 Flume 实时上传日志的进程挂掉了,那么一般第二天我们需要手动上传前一天的日志,在上传之前,我们不能对所有的日志进行重复上传,所以需要对日志进行按天切割。
15.1、先分割 Nginx 日志,按天保存
图解示例如下:
15.1.1、脚本编写
脚本 split_nginx_log.sh
#!/bin/bash
#实时日志存放地路径
LOGS_PATH="/usr/local/nginx/user_logs"
#生成每天旧日志存放地路径(可以先测试下该句代码的正确性:命令 echo $OLD_LOGS_PATH)
OLD_LOGS_PATH=${LOGS_PATH}/logs/$(date -d "yesterday" +%Y)/$(date -d "yesterday" +%m)/$(date -d "yesterday" +%d)
#创建每天旧日志存放目录,如果目录不存在(方法1、先判断,再创建,否则可能出错。方法2、使用-p递归操作即使存在也不会报错。)
mkdir -p ${OLD_LOGS_PATH}
#移动昨天的日志文件到指定目录,并重命名该日志文件(便于管理和区分)
mv ${LOGS_PATH}/access.log ${OLD_LOGS_PATH}/access_$(date -d "yesterday" +%Y%m%d_%H%M%S).log
#因为 Nginx 持有文件 access.log 的句柄,所以移动该文件的时候,需要重启 Nginx,刷新文件句柄
kill -USR1 `cat /usr/local/nginx/logs/nginx.pid`
15.1.2、其他环境准备
Step1、创建该项目系列脚本存放的目录
$ mkdir -p /opt/module/mysbin
Step2、将脚本移动到该目录下
$ mv split_nginx_log.sh /opt/module/mysbin
Step3、确保脚本执行的用户拥有所需的权限(使用 root 用户修改)
# chown root:root split_nginx_log.sh
# chmod u+x split_nginx_log.sh
15.1.2、定时任务
使用 root 用户
# crontab -e
编辑内容如下:
# .------------------------------------------minute(0~59)
# | .----------------------------------------hours(0~23)
# | | .--------------------------------------day of month(1~31)
# | | | .------------------------------------month(1~12)
# | | | | .----------------------------------day of week(0~6)
# | | | | | .--------------------------------command
# | | | | | |
# | | | | | |
0 0 * * * /opt/module/mysbin/split_nginx_log.sh
15.2、再手动上传 Nginx 日志文件到 HDFS 上
15.2.1、脚本编写
脚本 put_nginx_log_2_hdfs.sh
#!/bin/bash
#计算昨天的日期(可以先测试下该句代码的正确性:命令 echo $yesterday)
yesterday=$(date --date=‘1 days ago‘ +‘%Y/%m/%d‘)
#今天之前的所有的日志存放的目录
LOGS_PATH=/usr/local/nginx/user_logs/logs
#HDFS 存放日志的目录
HDFS_LOGS_PATH=/event-logs/${yesterday}
export HADOOP_USER_NAME=atguigu
#递归创建 HDFS 存储目录
/opt/module/hadoop-2.7.2/bin/hdfs dfs -mkdir -p ${HDFS_LOGS_PATH}
#开始上传
/opt/module/hadoop-2.7.2/bin/hdfs dfs -put -f -p ${LOGS_PATH}/$(date -d "yesterday" +"%Y")/$(date -d "yesterday" +"%m")/$(date -d "yesterday" +"%d")/access_*.log ${HDFS_LOGS_PATH}
15.2.2、测试脚本
Step1、修改 HADOOP_USER_NAME 对应的 HDFS 操作用户名称,并且关闭权限检查
/opt/module/hadoop-2.7.2/etc/hadoop/hdfs-site.xml
# export HADOOP_USER_NAME=atguigu
<!-- 关闭权限检查-->
<property>
<name>dfs.permissions.enable</name>
<value>false</value>
</property>
Step2、将脚本 hdfs-site.xml 移动到 mysbin 目录下
# mv put_nginx_log_2_hdfs.sh /opt/module/mysbin
Step3、确保脚本执行的用户拥有所需的权限(使用 root 用户修改)
# chown root:root put_nginx_log_2_hdfs.sh
# chmod u+x put_nginx_log_2_hdfs.sh
Step4、执行上传文件脚本
# /opt/module/mysbin/put_nginx_log_2_hdfs.sh
测试成功,浏览器显示如下:
注意
:该上传脚本不用设置成定时任务,因为该脚本是在 flume 挂掉的时候,我们再去手动执行该脚本。
以上是关于大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上(示例代码的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之_18_大数据离线平台_01_项目开发流程+大数据常用应用/分析平台/业务处理方式+数据分析平台的数据来源+数据处理的流程+项目集群的规模+需求分析+技术架构/选型
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
大数据技术之_19_Spark学习_02_Spark Core 应用解析小结
大数据技术之_24_电影推荐系统项目_02_Python 基础语法