应用概况story文档初稿
Posted 彭宇成
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了应用概况story文档初稿相关的知识,希望对你有一定的参考价值。
1. 作为
用户行为分析产品的核心功能之一,
2. 我需要
2.1 实时统计出当前在线人数(OV)、累计浏览次数(PV)、访客数(UV)、登录次数(VV)与注册次数(RV)共5个基础指标以及累计下载、卸载人数共两个app端个性指标。
2.1 离线统计,并以趋势图展示美的金融app在过去一段时间内的UV与PV趋势、下载与卸载趋势、登录与注册趋势。
3. 这样
业务人员就能通过这些基础指标,轻松掌握美的金融app的整体运营状况。
4. 整体设计
4.1 架构设计
如上图示,根据业务指标分离线与实时两部分:
1)数据的获取-实时部分 :Spark streaming 定时(暂定 10秒钟,可配置)从消息中间件kafka的independence_midea_finance_app_base_action与independence_midea_finance_user_action 主题中拉取数据,清洗、聚合得到当天的 实时业务指标数据集 datas1,其中 kafak的相关环境信息详见附件。然后,将datas1 中各字段值累加到 user_action_es/finance_app_fundamentals 文档相关field中,得到当前累加的各指标数据集 datas2;
2)数据的获取-离线部分
略。
3)数据在web系统的处理与展示:整个web系统基于spring boot 1.5.6 的微服务架构实现,保证系统的灵活性、稳定性与高可用。web后端定时(暂定10秒,可配置)从数据集datas2中拉取、聚合分析相关字段,并以 restfull api的形式将数据返回给 模板引擎 H5 展示。
4.1.1 kafka 相关参数
详见附件
4.2 ES文档结构设计
实时部分
说明:在线人数指标只统计当天的数据即可,因此无需在该表设计相关字段
4.3 核心代码
private static final Logger LOG = LoggerFactory.getLogger(AppBasicInfoStatistic.class);
private static Map<String, String> dateToDocid = new ConcurrentHashMap<String, String>();
private static IdWorker idWorker = new IdWorker(1L);
public static void main(String[] args)
/*
* streaming运行环境设置
*/
SparkConf conf = new SparkConf();
setSparkEnvs(conf);
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(ConfigureManager.getInteger(Constants.STREAMING_DURATION)));
jssc.checkpoint(ConfigureManager.getProperty(Constants.CHECKPOINT_DIR));
jssc.ssc().sparkContext().setLogLevel("ERROR");
/*
* 从kafka中读取用户的基础行为数据
*/
JavaPairDStream<String, String> userBaseActionLogDStream = getBaseActionLogDStream(jssc);
/*
* 业务处理:实时计算pv、uv与安装用户量
*/
String indexName = ConfigureManager.getProperty("es.realtime.index");
JavaPairDStream<String, Long> pVLogDStream = getPvLogDStream(userBaseActionLogDStream);
JavaDStream<PVEntity> pvsDStream = getFormattedDStream(pVLogDStream, new PVEntity());
JavaEsSparkStreaming.saveToEs(pvsDStream, indexName);
List<String> uvs = new ArrayList<String>();
JavaPairDStream<String, Long> uVLogDStream = getUVLogDStream(userBaseActionLogDStream, uvs);
JavaDStream<UVEntity> uvsDStream = getFormattedDStream(uVLogDStream, new UVEntity());
JavaEsSparkStreaming.saveToEs(uvsDStream, indexName);
List<String> installers = new ArrayList<String>();
JavaPairDStream<String, Long> installerLogDStream = getInstallerLogDStream(
userBaseActionLogDStream, installers);
JavaDStream<InstallerEntity> installerDStream = getFormattedDStream(installerLogDStream,
new InstallerEntity());
JavaEsSparkStreaming.saveToEs(installerDStream, indexName);
/*
* 启动
*/
jssc.start();
jssc.awaitTermination();
jssc.close();
/**
* function:设置streaming运行环境
*
* @param conf
*/
private static void setSparkEnvs(SparkConf conf)
conf.setAppName("AppBasicInfoStatistic");
conf.set(Constants.ES_NODES, ConfigureManager.getProperty(Constants.ES_NODES));
conf.set(Constants.ES_INDEX_AUTO_CREATE,
ConfigureManager.getProperty(Constants.ES_INDEX_AUTO_CREATE));
conf.set(Constants.ES_MAPPING_ID, ConfigureManager.getProperty(Constants.ES_MAPPING_ID));
conf.set(Constants.ES_PORT, ConfigureManager.getProperty(Constants.ES_PORT));
/**
* function:以directAPI方式从kafka中读取基础行为数据数据
*
* @param jssc
* JavaStreamingContext
* @return JavaPairInputDStream<String, String>
*/
private static JavaPairDStream<String, String> getBaseActionLogDStream(JavaStreamingContext jssc)
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list",
ConfigureManager.getProperty("metadata.broker.list"));
kafkaParams.put("group.id", ConfigureManager.getProperty("group.id"));
String kafkaTopics = ConfigureManager.getProperty("topics");
String[] kafkaTopicsSplited = kafkaTopics.split(Constants.SEMICOLON);
Set<String> topics = new HashSet<String>();
for (String kafkaTopic : kafkaTopicsSplited)
topics.add(kafkaTopic);
JavaPairDStream<String, String> userBaseActionLogDStream = KafkaUtils.createDirectStream(
jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics);
return userBaseActionLogDStream;
/**
* function:实时获取当天的Pv量
*
* @param mappedLogDStream
* @return JavaPairDStream<String, Long>
*/
@SuppressWarnings("deprecation")
private static JavaPairDStream<String, Long> getPvLogDStream(JavaPairDStream<String, String> userBaseActionLogDStream)
JavaPairDStream<String, Long> mappedDStream = userBaseActionLogDStream.mapToPair(new PairFunction<Tuple2<String, String>, String, Long>()
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(Tuple2<String, String> actionLog)
throws Exception
String date = null;
JsonElement dateElem = null;
try
JsonObject jObject = new JsonParser().parse(actionLog._2).getAsJsonObject();
dateElem = jObject.get(Constants.ACTION_CREATE_TIME);
date = DateUtil.transferLongToDate(DateUtil.DATE_FORMAT_1,
dateElem.getAsLong());
catch (NumberFormatException exp)
date = dateElem.getAsString().trim().split(Constants.BLANK)[0];
catch (Exception exp)
LOG.error("fail to parse action log from kakfa." + exp.toString());
return new Tuple2<String, Long>(date, Long.valueOf(1));
);
JavaPairDStream<String, Long> pVLogDStream = mappedDStream.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>()
private static final long serialVersionUID = 1L;
@Override
public Optional<Long> call(List<Long> values, Optional<Long> optional)
throws Exception
long pvs = 0l;
if (optional.isPresent())
pvs = optional.get();
for (long value : values)
pvs += value;
return Optional.of(pvs);
);
return pVLogDStream;
/**
* function:获取转换成T后的DStream,方便写入 ES
*
* @param pairDStream
* @param t
* entity PVEntity,UVEntity or InstallerEntity
* @return JavaDStream<T>
*/
private static <T> JavaDStream<T> getFormattedDStream(JavaPairDStream<String, Long> pairDStream,
final T t)
return pairDStream.transform(new Function<JavaPairRDD<String, Long>, JavaRDD<T>>()
private static final long serialVersionUID = -1603265641717571106L;
@Override
public JavaRDD<T> call(JavaPairRDD<String, Long> rdd)
throws Exception
return rdd.map(new Function<Tuple2<String, Long>, T>()
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@Override
public T call(Tuple2<String, Long> tuple)
throws Exception
String date = tuple._1;
if (t instanceof PVEntity)
String guid = dateToDocid.get(date + Constants.PV_SUFFIX);
if (null == guid)
guid = idWorker.nextId() + "";
dateToDocid.put(date + Constants.PV_SUFFIX, guid);
return (T)new PVEntity(guid, date, tuple._2);
if (t instanceof InstallerEntity)
String guid = dateToDocid.get(date + Constants.INSTALLER_SUFFIX);
if (null == guid)
guid = idWorker.nextId() + "";
dateToDocid.put(date + Constants.INSTALLER_SUFFIX, guid);
return (T)new InstallerEntity(guid, date, tuple._2);
String guid = dateToDocid.get(date + Constants.UV_SUFFIX);
if (null == guid)
guid = idWorker.nextId() + "";
dateToDocid.put(date + Constants.UV_SUFFIX, guid);
return (T)new UVEntity(guid, date, tuple._2);
);
);
/**
* function: 实时获取当天的uv量 DStream中数据流举例:json格式行为数据 => 2017-9-21_pengyc,2017-9-21_pengyc =>(去重)
* 2017-9-21_pengyc => (2017-9-21,1) =>(2017-9-21,nums)
*
* @param mappedLogDStream
* @return JavaPairDStream<String, Long>
*/
@SuppressWarnings("deprecation")
private static JavaPairDStream<String, Long> getUVLogDStream(JavaPairDStream<String, String> userBaseActionLogDStream,
final List<String> uvs)
return userBaseActionLogDStream.map(new Function<Tuple2<String, String>, String>()
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> actionLog)
throws Exception
String date = "";
String user_account = "";
JsonElement dateElem = null;
try
JsonObject jObject = new JsonParser().parse(actionLog._2).getAsJsonObject();
dateElem = jObject.get(Constants.ACTION_CREATE_TIME);
date = DateUtil.transferLongToDate(DateUtil.DATE_FORMAT_1,
dateElem.getAsLong());
// 目前游客的user_account为空,后续在前端为其分配一个唯一标识,暂时将所有的游客标识都设置为 youke_id
user_account = (("".equals(jObject.get(Constants.USER_ACCOUNT))
|| null == jObject.get(Constants.USER_ACCOUNT)) ? "youke_id" : jObject.get(
Constants.USER_ACCOUNT).getAsString());
catch (NumberFormatException exp)
date = dateElem.getAsString().trim().split(Constants.BLANK)[0];
catch (Exception exp)
LOG.error("fail to parse action log from kakfa." + exp.toString());
return date + "_" + user_account;
).transform(new Function<JavaRDD<String>, JavaRDD<String>>()
private static final long serialVersionUID = 1L;
@SuppressWarnings("resource")
@Override
public JavaRDD<String> call(JavaRDD<String> rdd)
throws Exception
List<String> batchUVs = new ArrayList<String>(rdd.distinct().collect());
batchUVs.removeAll(uvs);
uvs.addAll(batchUVs);
return new JavaSparkContext(rdd.context()).parallelize(uvs);
).mapToPair(new PairFunction<String, String, Long>()
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(String t)
throws Exception
return new Tuple2<String, Long>(t.split("_")[0], 1L);
).reduceByKey(new Function2<Long, Long, Long>()
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2)
throws Exception
return v1 + v2;
);
/**
* function: 实时获取当天的用户安装数量 DStream中数据流举例:json格式 action_type=install的行为数据 =>
* 2017-9-21_pengyc,2017-9-21_pengyc =>(去重) 2017-9-21_pengyc => (2017-9-21,1)
* =>(2017-9-21,nums)
*
* @param mappedLogDStream
* @param installers
* @return JavaPairDStream<String, Long>
*/
@SuppressWarnings("deprecation")
private static JavaPairDStream<String, Long> getInstallerLogDStream(JavaPairDStream<String, String> userBaseActionLogDStream,
final List<String> installers)
return userBaseActionLogDStream.filter(new Function<Tuple2<String, String>, Boolean>()
private static final long serialVersionUID = 1777746689772288156L;
@Override
public Boolean call(Tuple2<String, String> actionLog)
throws Exception
JsonObject jObject = new JsonParser().parse(actionLog._2).getAsJsonObject();
String action_type = jObject.get(Constants.ACTION_TYPE).getAsString();
if (Constants.TYPE_INSTALL.equals(action_type))
return true;
return false;
).map(new Function<Tuple2<String, String>, String>()
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> actionLog)
throws Exception
String date = "";
String user_account = "";
JsonElement dateElem = null;
try
JsonObject jObject = new JsonParser().parse(actionLog._2).getAsJsonObject();
dateElem = jObject.get(Constants.ACTION_CREATE_TIME);
date = DateUtil.transferLongToDate(DateUtil.DATE_FORMAT_1,
dateElem.getAsLong());
// 目前游客的user_account为空,后续在前端为其分配一个唯一标识,暂时将所有的游客标识都设置为 youke_id
user_account = (("".equals(jObject.get(Constants.USER_ACCOUNT))
|| null == jObject.get(Constants.USER_ACCOUNT)) ? "youke_id" : jObject.get(
Constants.USER_ACCOUNT).getAsString());
catch (NumberFormatException exp)
date = dateElem.getAsString().trim().split(Constants.BLANK)[0];
catch (Exception exp)
LOG.error("fail to parse action log from kakfa." + exp.toString());
return date + "_" + user_account;
).transform(new Function<JavaRDD<String>, JavaRDD<String>>()
private static final long serialVersionUID = 1L;
@SuppressWarnings("resource")
@Override
public JavaRDD<String> call(JavaRDD<String> rdd)
throws Exception
List<String> batchUVs = new ArrayList<String>(rdd.distinct().collect());
batchUVs.removeAll(installers);
installers.addAll(batchUVs);
return new JavaSparkContext(rdd.context()).parallelize(installers);
).mapToPair(new PairFunction<String, String, Long>()
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Long> call(String t)
throws Exception
return new Tuple2<String, Long>(t.split("_")[0], 1L);
).reduceByKey(new Function2<Long, Long, Long>()
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2)
throws Exception
return v1 + v2;
);
总结
1、当前每个实时指标,单独对应一个Entity,可以考虑将所有的实时指标关联到一个Entity。
2、用java7写sparkstreaming太冗余了,考虑到机器学习主流代码用python写的,后续将所有的大数据分析代码迁移到python上来。
以上是关于应用概况story文档初稿的主要内容,如果未能解决你的问题,请参考以下文章