大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址
Posted chenmingjun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址相关的知识,希望对你有一定的参考价值。
二十、数据分析20.1、统计表20.2、目标20.3、代码实现20.3.1、Mapper20.3.2、Reducer20.3.3、Runner20.3.4、测试二十一、Hive 之 hourly 分析21.1、目标21.2、目标解析21.3、创建 Mysql 结果表21.4、Hive 分析21.4.1、创建 Hive 外部表,关联 HBase 数据表21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)21.4.3、提取 e_pv 和 e_l 事件数据到临时表中21.4.4、创建分析结果临时保存表21.4.5、分析活跃访客数21.4.6、分析会话长度21.4.7、创建最终结果表21.4.8、向结果表中插入数据21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据二十二、常用 Maven 仓库地址
二十、数据分析
20.1、统计表
通过表结构可以发现,只要维度id确定了,那么 new_install_users 也就确定了。
20.2、目标
按照不同维度统计新增用户。比如:将 日、周、月 新增用户统计出来。传入的时间参数是: -date 2017-08-14
20.3、代码实现
20.3.1、Mapper
-
Step1、创建 NewInstallUsersMapper 类,outputKey 为 StatsUserDimension,outputValue 为 Text。定义全局变量,Key 和 Value 的对象。
-
Step2、覆写 map 方法,在该方法中读取 HBase 中待处理的数据,分别要包含维度的字段信息以及必有的字段信息。比如:serverTime、platformName、platformVersion、browserName、browserVersion、uuid。
-
Step3、数据过滤以及时间字符串转换。
-
Step4、构建维度信息:天维度,周维度,月维度,platform 维度[(name, version)(name, all)(all, all)],browser 维度[(browser, all) (browser, version)]。
-
Step5、设置 outputValue 的值为 uuid。
-
Step6、按照不同维度设置 outputKey。
-
Step7、将封装好的数据写入到 Mapper 的上下文对象中,输出给 Reducer。
示例代码如下:
NewInstallUsersMapper.java
package com.z.transformer.mr.statistics;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import com.z.transformer.common.DateEnum;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.base.BrowserDimension;
import com.z.transformer.dimension.key.base.DateDimension;
import com.z.transformer.dimension.key.base.KpiDimension;
import com.z.transformer.dimension.key.base.PlatformDimension;
import com.z.transformer.dimension.key.stats.StatsCommonDimension;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.util.TimeUtil;
/**
* 思路:思路:HBase 读取数据 --> HBaseInputFormat --> Mapper --> Reducer --> DBOutPutFormat--> 这接写入到 mysql 中
*
* @author bruce
*/
public class NewInstallUserMapper extends TableMapper<StatsUserDimension, Text> {
// Mapper 的 OutPutKey 和 OutPutValue
// OutPutKey = StatsUserDimension 进行用户分析的组合维度(用户基本分析维度和浏览器分析维度)
// OutPutValue = Text uuid(字符串)
private static final Logger logger = Logger.getLogger(NewInstallUserMapper.class);
// 定义列族
private byte[] family = EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME;
// 定义输出 key
private StatsUserDimension outputKey = new StatsUserDimension();
// 定义输出 value
private Text outputValue = new Text();
// 映射输出 key 中的 StatsCommonDimension(公用维度) 属性,方便后续封装操作
private StatsCommonDimension statsCommonDimension = this.outputKey.getStatsCommon();
private long date, endOfDate; // 定义运行天的起始时间戳和结束时间戳
private long firstThisWeekOfDate, endThisWeekOfDate; // 定义运行天所属周的起始时间戳和结束时间戳
private long firstThisMonthOfDate, firstDayOfNextMonth; // 定义运行天所属月的起始时间戳和结束时间戳
// 创建 kpi 维度对象
private KpiDimension newInstallUsersKpiDimension = new KpiDimension(KpiType.NEW_INSTALL_USER.name);
private KpiDimension browserNewInstallUsersKpiDimension = new KpiDimension(KpiType.BROWSER_NEW_INSTALL_USER.name);
// 定义一个特殊占位的浏览器维度对象
private BrowserDimension defaultBrowserDimension = new BrowserDimension("", "");
// 初始化操作
@Override
protected void setup(Mapper<ImmutableBytesWritable, Result, StatsUserDimension, Text>.Context context)
throws IOException, InterruptedException {
// 1、获取参数配置项的上下文
Configuration conf = context.getConfiguration();
// 2、获取我们给定的运行时间参数,获取运行的是哪一天的数据
String date = conf.get(GlobalConstants.RUNNING_DATE_PARAMES);
// 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
this.date = TimeUtil.parseString2Long(date);
// 传入时间所属当前天结束的时间戳
this.endOfDate = this.date + GlobalConstants.DAY_OF_MILLISECONDS;
// 传入时间所属当前周的第一天的时间戳
this.firstThisWeekOfDate = TimeUtil.getFirstDayOfThisWeek(this.date);
// 传入时间所属下一周的第一天的时间戳
this.endThisWeekOfDate = TimeUtil.getFirstDayOfNextWeek(this.date);
// 传入时间所属当前月的第一天的时间戳
this.firstThisMonthOfDate = TimeUtil.getFirstDayOfThisMonth(this.date);
// 传入时间所属下一月的第一天的时间戳
this.firstDayOfNextMonth = TimeUtil.getFirstDayOfNextMonth(this.date);
}
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
// 1、获取属性,参数值,即读取 HBase 中的数据:serverTime、platformName、platformVersion、browserName、browserVersion、uuid
String serverTime = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME)));
String platformName = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_PLATFORM)));
String platformVersion = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_VERSION)));
String browserName = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME)));
String browserVersion = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION)));
String uuid = Bytes
.toString(value.getValue(family, Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_UUID)));
// 2、针对数据进行简单过滤(实际开发中过滤条件更多)
if (StringUtils.isBlank(platformName) || StringUtils.isBlank(uuid)) {
logger.debug("数据格式异常,直接过滤掉数据:" + platformName);
return; // 过滤掉无效数据
}
// 属性处理
long longOfServerTime = -1;
try {
longOfServerTime = Long.valueOf(serverTime); // 将字符串转换为long类型
} catch (Exception e) {
logger.debug("服务器时间格式异常:" + serverTime);
return; // 服务器时间异常的数据直接过滤掉
}
// 3、构建维度信息
// 获取当前服务器时间对应的当天维度的对象
DateDimension dayOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.DAY);
// 获取当前服务器时间对应的当周维度的对象
DateDimension weekOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.WEEK);
// 获取当前服务器时间对应的当月维度的对象
DateDimension monthOfDimension = DateDimension.buildDate(longOfServerTime, DateEnum.MONTH);
// 还可以获取 当季维度、当年维度......
// 构建平台维度对象
List<PlatformDimension> platforms = PlatformDimension.buildList(platformName, platformVersion);
// 构建浏览器维度对象
List<BrowserDimension> browsers = BrowserDimension.buildList(browserName, browserVersion);
// 4、设置 outputValue
this.outputValue.set(uuid);
// 5、设置 outputKey
for (PlatformDimension pf : platforms) {
// 设置浏览器维度(是个空的)
this.outputKey.setBrowser(this.defaultBrowserDimension);
// 设置平台维度
this.statsCommonDimension.setPlatform(pf);
// 下面的代码是处理对应于 stats_user 表的统计数据
// 设置 kpi 维度
this.statsCommonDimension.setKpi(this.newInstallUsersKpiDimension);
// 处理不同时间维度的情况
// 处理天维度数据,要求服务器时间处于指定日期的范围:[today, endOfDate)
if (longOfServerTime >= date && longOfServerTime < endOfDate) {
// 设置时间维度为服务器时间当天的维度
this.statsCommonDimension.setDate(dayOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
// 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
// 设置时间维度为服务器时间所属周的维度
this.statsCommonDimension.setDate(weekOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
// 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
// 设置时间维度为服务器时间所属月的维度
this.statsCommonDimension.setDate(monthOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
// 下面的代码是处理对应于 stats_device_browser 表的统计数据
// 设置 kpi 维度
this.statsCommonDimension.setKpi(this.browserNewInstallUsersKpiDimension);
for (BrowserDimension br : browsers) {
// 设置浏览器维度
this.outputKey.setBrowser(br);
// 处理不同时间维度的情况
// 处理天维度数据,要求当前事件的服务器时间处于指定日期的范围内,[今天0点, 明天0点)
if (longOfServerTime >= date && longOfServerTime < endOfDate) {
// 设置时间维度为服务器时间当天的维度
this.statsCommonDimension.setDate(dayOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
// 处理周维度数据,范围:[firstThisWeekOfDate, endThisWeekOfDate)
if (longOfServerTime >= firstThisWeekOfDate && longOfServerTime < endThisWeekOfDate) {
// 设置时间维度为服务器时间所属周的维度
this.statsCommonDimension.setDate(weekOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
// 处理月维度数据,范围:[firstThisMonthOfDate, firstDayOfNextMonth)
if (longOfServerTime >= firstThisMonthOfDate && longOfServerTime < firstDayOfNextMonth) {
// 设置时间维度为服务器时间所属月的维度
this.statsCommonDimension.setDate(monthOfDimension);
// 输出数据
context.write(outputKey, outputValue);
}
}
}
}
}
20.3.2、Reducer
-
Step1、创建
NewInstallUserReducer<StatsUserDimension, Text, StatsUserDimension, MapWritableValue>
类,覆写 reduce 方法。 -
Step2、统计 uuid 出现的次数,并且去重。
-
Step3、将数据拼装到 outputValue 中。
-
Step4、设置数据业务 KPI 类型,最终输出数据。
维度类结构图
我们再来回顾下大数据离线平台架构图:
示例代码如下:
NewInstallUserReducer.java
package com.z.transformer.mr.statistics;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.z.transformer.common.KpiType;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;
public class NewInstallUserReducer extends Reducer<StatsUserDimension, Text, StatsUserDimension, MapWritableValue> {
// 保存唯一 id 的集合 Set,用于计算新增的访客数量
private Set<String> uniqueSets = new HashSet<String>();
// 定义输出 value
private MapWritableValue outputValue = new MapWritableValue();
@Override
protected void reduce(StatsUserDimension key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 1、统计 uuid 出现的次数,去重
for (Text uuid : values) { // 增强 for 循环,遍历 values
this.uniqueSets.add(uuid.toString());
}
// 2、输出数据拼装
MapWritable map = new MapWritable();
map.put(new IntWritable(-1), new IntWritable(this.uniqueSets.size()));
this.outputValue.setValue(map);
// 3、设置 outputValue 数据对应描述的业务指标(kpi)
if (KpiType.BROWSER_NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
// 表示处理的是 browser new install user kpi 的计算
this.outputValue.setKpi(KpiType.BROWSER_NEW_INSTALL_USER);
} else if (KpiType.NEW_INSTALL_USER.name.equals(key.getStatsCommon().getKpi().getKpiName())) {
// 表示处理的是 new install user kpi 的计算
this.outputValue.setKpi(KpiType.NEW_INSTALL_USER);
}
// 4、输出数据
context.write(key, outputValue);
}
}
20.3.3、Runner
-
Step1、创建 NewInstallUserRunner 类,实现 Tool 接口。
-
Step2、添加时间处理函数,用来截取参数。
-
Step3、组装 Job。
-
Step4、设置 HBase InputFormat(设置从 HBase 中读取的数据都有哪些)。
-
Step5、自定义 OutPutFormat 并设置。
示例代码如下:
NewInstallUserRunner.java
package com.z.transformer.mr.statistics;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.MultipleColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.z.transformer.common.EventLogConstants;
import com.z.transformer.common.EventLogConstants.EventEnum;
import com.z.transformer.common.GlobalConstants;
import com.z.transformer.dimension.key.stats.StatsUserDimension;
import com.z.transformer.dimension.value.MapWritableValue;
import com.z.transformer.mr.TransformerMySQLOutputFormat;
import com.z.transformer.util.TimeUtil;
public class NewInstallUserRunner implements Tool {
// 给定一个参数表示参数上下文
private Configuration conf = null;
public static void main(String[] args) {
try {
int exitCode = ToolRunner.run(new NewInstallUserRunner(), args);
if (exitCode == 0) {
System.out.println("运行成功");
} else {
System.out.println("运行失败");
}
System.exit(exitCode);
} catch (Exception e) {
System.err.println("执行异常:" + e.getMessage());
}
}
@Override
public void setConf(Configuration conf) {
// 添加自己开发环境所有需要的其他资源属性文件
conf.addResource("transformer-env.xml");
conf.addResource("output-collector.xml");
conf.addResource("query-mapping.xml");
// 创建 HBase 的 Configuration 对象
this.conf = HBaseConfiguration.create(conf);
}
@Override
public Configuration getConf() {
return this.conf;
}
@Override
public int run(String[] args) throws Exception {
// 1、获取参数上下文对象
Configuration conf = this.getConf();
// 2、处理传入的参数,将参数添加到上下文中
this.processArgs(conf, args);
// 3、创建 Job
Job job = Job.getInstance(conf, "new_install_users");
// 4、设置 Job 的 jar 相关信息
job.setJarByClass(NewInstallUserRunner.class);
// 5、设置 IntputFormat 相关配置参数
this.setHBaseInputConfig(job);
// 6、设置 Mapper 相关参数
// 在 setHBaseInputConfig 已经设置了
// 7、设置 Reducer 相关参数
job.setReducerClass(NewInstallUserReducer.class);
job.setOutputKeyClass(StatsUserDimension.class);
job.setOutputValueClass(MapWritableValue.class);
// 8、设置 OutputFormat 相关参数,使用一个自定义的 OutputFormat
job.setOutputFormatClass(TransformerMySQLOutputFormat.class);
// 9、Job 提交运行
boolean result = job.waitForCompletion(true);
// 10、运行成功返回 0,失败返回 -1
return result ? 0 : -1;
}
/**
* 处理时间参数,如果没有传递参数的话,则默认清洗前一天的。
*
* Job脚本如下: bin/yarn jar ETL.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2017-08-14
*
* @param args
*/
private void processArgs(Configuration conf, String[] args) {
String date = null;
for (int i = 0; i < args.length; i++) {
if ("-date".equals(args[i])) {
if (i + 1 < args.length) {
date = args[i + 1];
break;
}
}
}
// 查看是否需要默认参数
if (StringUtils.isBlank(date) || !TimeUtil.isValidateRunningDate(date)) {
date = TimeUtil.getYesterday(); // 默认时间是昨天
}
// 保存到上下文中间
conf.set(GlobalConstants.RUNNING_DATE_PARAMES, date);
}
/**
* 设置从 hbase 读取数据的相关配置信息
*
* @param job
* @throws IOException
*/
private void setHBaseInputConfig(Job job) throws IOException {
Configuration conf = job.getConfiguration();
// 获取已经执行ETL操作的那一天的数据
String dateStr = conf.get(GlobalConstants.RUNNING_DATE_PARAMES); // 2017-08-14
// 因为我们要访问 HBase 中的多张表,所以需要多个 Scan 对象,所以创建 Scan 集合
List<Scan> scans = new ArrayList<Scan>();
// 开始构建 Scan 集合
// 1、构建 Hbase Scan Filter 对象
FilterList filterList = new FilterList();
// 2、构建只获取 Launch 事件的 Filter
filterList.addFilter(new SingleColumnValueFilter(
EventLogConstants.BYTES_EVENT_LOGS_FAMILY_NAME, // 列族
Bytes.toBytes(EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME), // 事件名称
CompareOp.EQUAL, // 等于判断
Bytes.toBytes(EventEnum.LAUNCH.alias))); // Launch 事件的别名
// 3、构建部分列的过滤器 Filter
String[] columns = new String[] {
EventLogConstants.LOG_COLUMN_NAME_PLATFORM, // 平台名称
EventLogConstants.LOG_COLUMN_NAME_VERSION, // 平台版本
EventLogConstants.LOG_COLUMN_NAME_BROWSER_NAME, // 浏览器名称
EventLogConstants.LOG_COLUMN_NAME_BROWSER_VERSION, // 浏览器版本
EventLogConstants.LOG_COLUMN_NAME_SERVER_TIME, // 服务器时间
EventLogConstants.LOG_COLUMN_NAME_UUID, // 访客唯一标识符 uuid
EventLogConstants.LOG_COLUMN_NAME_EVENT_NAME // 确保根据事件名称过滤数据有效,所以需要该列的值
};
// 创建 getColumnFilter 方法用于得到 Filter 对象
// 根据列名称过滤数据的 Filter
filterList.addFilter(this.getColumnFilter(columns));
// 4、数据来源表所属日期是哪些
long startDate, endDate; // Scan 的表区间属于[startDate, endDate)
long date = TimeUtil.parseString2Long(dateStr); // 传入时间所属当前天开始的时间戳,即当前天的0点0分0秒的毫秒值
long endOfDate = date + GlobalConstants.DAY_OF_MILLISECONDS; // 传入时间所属当前天结束的时间戳
long firstDayOfWeek = TimeUtil.getFirstDayOfThisWeek(date); // 传入时间所属当前周的第一天的时间戳
long lastDayOfWeek = TimeUtil.getFirstDayOfNextWeek(date); // 传入时间所属下一周的第一天的时间戳
long firstDayOfMonth = TimeUtil.getFirstDayOfThisMonth(date); // 传入时间所属当前月的第一天的时间戳
long lastDayOfMonth = TimeUtil.getFirstDayOfNextMonth(date); // 传入时间所属下一月的第一天的时间戳
// [date,
// [firstDayOfWeek
// [firstDayOfMonth
// 选择最小的时间戳作为数据输入的起始时间,date 一定大于等于其他两个 first 时间戳值
// 获取起始时间
startDate = Math.min(firstDayOfMonth, firstDayOfWeek);
// 获取结束时间
endDate = TimeUtil.getTodayInMillis() + GlobalConstants.DAY_OF_MILLISECONDS;
if (endOfDate > lastDayOfWeek || endOfDate > lastDayOfMonth) {
endDate = Math.max(lastDayOfMonth, lastDayOfWeek);
} else {
endDate = endOfDate;
}
// 获取连接对象,执行,这里使用 HBase 的 新 API
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = null;
try {
admin = connection.getAdmin();
} catch (Exception e) {
throw new RuntimeException("创建 HBaseAdmin 对象失败", e);
}
// 5、构建我们 scan 集合
try {
for (long begin = startDate; begin < endDate;) {
// 格式化 HBase 的后缀
String tableNameSuffix = TimeUtil.parseLong2String(begin, TimeUtil.HBASE_TABLE_NAME_SUFFIX_FORMAT); // 20170814
// 构建表名称:tableName = event_logs20170814
String tableName = EventLogConstants.HBASE_NAME_EVENT_LOGS + tableNameSuffix;
// 需要先判断表存在,然后当表存在的情况下,再构建 Scan 对象
if (admin.tableExists(TableName.valueOf(tableName))) {
// 表存在,进行 Scan 对象创建
Scan scan = new Scan();
// 需要扫描的 HBase 表名设置到 Scan 对象中
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
// 设置过滤对象
scan.setFilter(filterList);
// 添加到 Scan 集合中
scans.add(scan);
}
// begin 累加
begin += GlobalConstants.DAY_OF_MILLISECONDS;
}
} finally {
// 关闭 Admin 连接
try {
admin.close();
} catch (Exception e) {
// nothing
}
}
// 访问 HBase 表中的数据
if (scans.isEmpty()) {
// 没有表存在,那么 Job 运行失败
throw new RuntimeException("HBase 中没有对应表存在:" + dateStr);
}
// 指定Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob(
scans, // Scan 扫描控制器集合
NewInstallUserMapper.class, // 设置 Mapper 类
StatsUserDimension.class, // 设置 Mapper 输出 key 类型
Text.class, // 设置 Mapper 输出 value 值类型
job, // 设置给哪个 Job
true); // 如果在 Windows 上本地运行,则 addDependencyJars 参数必须设置为 false,如果打成 jar 包提交 Linux 上运行设置为 true,默认为 true
}
/**
* 获取一个根据列名称过滤数据的 Filter
*
* @param columns
* @return
*/
private Filter getColumnFilter(String[] columns) {
byte[][] prefixes = new byte[columns.length][];
for (int i = 0; i < columns.length; i++) {
prefixes[i] = Bytes.toBytes(columns[i]);
}
return new MultipleColumnPrefixFilter(prefixes);
}
}
20.3.4、测试
Step1、使用 maven 插件:maven-shade-plugin,将第三方依赖的 jar 全部打包进去,需要在 pom.xml 中配置依赖。参考【章节 十七、工具代码导入】中的 pom.xml 文件。
1、-P local clean package(不打包第三方jar)
2、-P dev clean package install(打包第三方jar)(推荐使用这种,本案例使用这种方式)
Step2、在 hadoop-env.sh 添加内容:
[[email protected] hadoop]$ pwd
/opt/module/hadoop-2.7.2/etc/hadoop
[[email protected] hadoop]$ vim hadoop-env.sh
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*
尖叫提示
:修改该配置后,需要配置分发,然后重启集群,方可生效!!!
Step3、打包成功后,将要运行的 transformer-0.0.1-SNAPSHOT.jar 拷贝至 /opt/module/hbase/lib 目录下,然后同步到其他机器或者配置分发:
同步到其他机器
[[email protected] ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop103:/opt/module/hbase/lib/
[[email protected] ~]$ scp -r /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar hadoop104:/opt/module/hbase/lib/
或者配置分发
[[email protected] ~]$ xsync /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar
尖叫提示
:如果没有同步到其他机器或者配置分发,会出现类找不到异常,如下:
执行异常:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.z.transformer.dimension.key.stats.StatsUserDimension not found
4、运行 jar 包,命令如下:
先进行数据清洗
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.etl.AnalysisDataRunner -date 2015-12-20
再进行统计运算
$ /opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase/lib/transformer-0.0.1-SNAPSHOT.jar com.z.transformer.mr.statistics.NewInstallUserRunner -date 2015-12-20
二十一、Hive 之 hourly 分析
尖叫提示
:由于 “-” 在 HBase 的表名中允许,在 Hive 的表名中不可以是 “-”,即在 Hive 中,“-” 是特殊字符,为了方便和统一,所以我们将 “-” 的地方替换为 “_”。这样就三者统一了。即 HDFS 上存放数据的目录变为 /event_logs/2015/12/20,HBase 数据库中的表名变为 event_logs20151220,Hive 中的表名为 event_logsxxx。
21.1、目标
分析一天 24 个时间段的新增用户、活跃用户、会话个数和会话长度四个指标,最终将结果保存到 HDFS 中,使用 sqoop 导出到 Mysql。
21.2、目标解析
- 新增用户:分析 launch 事件中各个不同时间段的 uuid 数量
- 活跃用户:分析 pageview 事件中各个不同时间段的 uuid 数量
- 会话个数:分析 pageview 事件中各个不同时间段的 会话id 数量
- 会话长度:分析 pageview 事件中各个不同时间段内所有会话时长的总和
21.3、创建 Mysql 结果表
21.4、Hive 分析
21.4.1、创建 Hive 外部表,关联 HBase 数据表
21.4.2、创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)
21.4.3、提取 e_pv 和 e_l 事件数据到临时表中
21.4.4、创建分析结果临时保存表
21.4.5、分析活跃访客数
Step1、具体平台,具体平台版本(platform:name, version:version)
Step2、具体平台,所有版本(platform:name, version:all)
Step3、所有平台,所有版本(platform:all, version:all)
21.4.6、分析会话长度
将每个会话的长度先要计算出来,然后统计一个时间段的各个会话的总和。
Step1、具体平台,具体平台版本(platform:name, version:version)
Step2、具体平台,所有版本(platform:name, version:all)
Step3、所有平台,所有版本(platform:all, version:all)
21.4.7、创建最终结果表
我们在这里需要创建一个和 Mysql 表结构一致的 Hive 表,便于后期使用 Sqoop 导出数据到 Mysql 中。
21.4.8、向结果表中插入数据
我们需要 platform_dimension_id int, date_dimension_id int, kpi_dimension_id int 三个字段,所以我们需要使用 UDF 函数生成对应的字段。
Step1、编写 UDF 函数,见代码
Step2、编译打包 UDF 函数代码
Step3、上传 UDF 代码 jar 包到 HDFS
Step4、使用 UDF 的 jar
Step5、执行最终数据统计
21.4.9、使用 Sqoop 导出 数据到 Mysql,观察数据
二十二、常用 Maven 仓库地址
常用 Maven 仓库地址
中央库:http://repo.maven.apache.org/maven2/
CDN库:https://repository.cloudera.com/artifactory/cloudera-repos/
Maven 中央仓库最近更新的 Artifact:http://maven.outofmemory.cn/
Search/Browse/Explore Maven Repository:https://mvnrepository.com/
以上是关于大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上(示例代码
大数据技术之_24_电影推荐系统项目_04_推荐系统算法详解
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池
大数据技术之_04_Hadoop学习_02_HDFS_DataNode(面试开发重点)+HDFS 2.X新特性
大数据技术之_08_Hive学习_04_压缩和存储(Hive高级)+ 企业级调优(Hive优化)
大数据技术之_03_Hadoop学习_01_入门_大数据概论+从Hadoop框架讨论大数据生态+Hadoop运行环境搭建(开发重点)