1111111111111111222222222

Posted 十一vs十一

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1111111111111111222222222相关的知识,希望对你有一定的参考价值。

第2章 100W请求秒杀架构体系-冷热数据收集
和隔离
目标1:垂直日志收集实现
目标2:Apache Druid
海量日志实时分析
目标2:热点数据隔离实现
目标4:Lua解析JWT令牌高效校验身份
1 日志收集
日志在我们项目中是非常重要的,日志的作用也有差异,例如根据日志查找问题、根据日志做数据分
析。在我们秒杀系统中,活跃的热点商品其实并不多,我们往往需要对热点商品进行额外处理。用户每
次抢购商品的时候,都是从详情页发起的,因此统计热度商品,详情页的访问频次可以算一个方向,详
情页访问的频次我们可以记录访问日志,然后统计某一段时间的访问量,根据访问量评判商品是否是热
点商品。
1.1 业务分析
日志收集流程如上图,用户请求经过nginx,此时已经留下了用户对某个商品访问的足迹,我们可以在
这里将用户访问的商品信息发送给我们kafka,采用大数据实时分析工具 Apache Druid 实时存储访问
信息,再通过程序分析计算访问情况。
1.2 Kafka
从上面流程图中,可以看到实现日志收集中间件是MQ,我们秒杀系统中会搭建MQ服务。
目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ、ActiveMQ,我们这里对每款MQ做一个
简单介绍。
Kafka
 
 
RocketMQ
RabbitMQ
kafka官网:http://kafka.apache.org/
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的
(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以
满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,
web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基
金会并成为顶级开源 项目。
1.快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化;
2.高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率;
3.高堆积:支持topic下消费者较长时间离线,消息堆积量大;
4.完全的分
布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自
动实现复杂均衡;
5.支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理
的限制,这是
6.高并
同时读写
RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ。RocketMQ是一款分布式、队
列模型的消息中间件,具有以下特点 :
1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的订阅者水平扩展能力
4.实时的消息订阅机制
5.支持事务消息
6.亿级消息堆积能力
使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如
此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将
消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load
balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。
 
 
1.2.1 Kafka搭建
单机版的kafka搭建非常简单,不过我们今天采用Docker搭建kafka。Kafka使用Zookeeper存储
Consumer、Broker信息,安装kafak的时候,需要先安装Zookeeper。
Zookeeper安装:
讲解:
Kafka安装:
讲解:
docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime
wurstmeister/zookeeper
/etc/localtime:/etc/localtime:使容器与宿主机时间能够同步
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e
KAFKA_ZOOKEEPER_CONNECT=172.17.0.5:2181/kafka -e
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.5:9092 -e
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime
wurstmeister/kafka
KAFKA_BROKER_ID:当前Kafka的唯一ID
KAFKA_ZOOKEEPER_CONNECT:当前Kafka使用的Zookeeper配置信息
KAFKA_ADVERTISED_LISTENERS:对外发布(暴露)的监听器,对外发布监听端口、地址
KAFKA_LISTENERS:监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服
务。
 
 
IP更改:
外部程序如果想链接Kafka,需要根据IP链接,所以我们可以给Kafka一个IP名字,编
辑: /opt/kafka_2.12-2.4.1/config/server.properties ,在文件最末尾添加如下代码:
1.2.2 队列创建
进入kafka容器,创建队列
:
讲解:
1.2.3 消息发布
在kafka容器中执行消息发送(接着上面的步骤执行):
讲解:
我们发送的消息如下(输入信息,回车即可发送):
1.2.4 消息订阅
host.name=192.168.211.137
docker exec -it kafka /bin/sh
cd /opt/kafka_2.12-2.4.1/bin
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication
factor 1 --partitions 1 --topic itheima
解释:使用kafka-topics.sh创建队列
--create:执行创建一个新的队列操作
--bootstrap-server:需要链接的kafka配置,必填
--replication-factor 1:设置分区的副本数量
--topic itemaccess:队列的名字叫itemaccess
./kafka-console-producer.sh --broker-list localhost:9092 --topic itemaccess
解释:使用kafka-console-producer.sh实现向kafka的test队列发送消息
--broker-list:指定将消息发给指定的Kafka服务的链接列表配置
HOST1:Port1,HOST2:Port2
--topic itemaccess:指定要发送消息的队列名字
"actime":"2022-5-11 11:50:10","uri":"http://www
seckill.itheima.net/items/555.html","IP":"119.123.33.231","Token":"Bearer
itheima"
 
 
在kafka容器中执行消息订阅(接着上面的步骤执行,但要先按ctrl+c退出控制台):
讲解:
查看已经存在的主题:
删除主题:
查看主题信息:
1.2.5 信息查看
上面执行整个流程如下图:
Kafka注册信息查看:
我们进入到zookeeper中,可以查看到kafka的注册信息,相关操作命令如下:
效果如下:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic itemaccess
--from-beginning
解释:使用kafka-console-consumer.sh从kafka中消费test队列的数据
--boo
t
s
t
r
a
p
-
se
rv
e
r:
从指
的k
a
fk
a中读取消息
--top
ic
i
t
e
m
a
cc
e
ss
:读
队列
-
-from-beginning:从最开始的数据读取,也就是读取所有数据的意思
./kafka-topics.sh --zookeeper localhost:3181 --list
./kafka-topics.sh --zookeeper localhost:3181 --delete --topic itemaccess
./kafka-topics.sh --zookeeper localhost:3181 --describe --topic itemaccess
docker exec -it zookeeper /bin/bash
cd bin
./zkCli.sh
ls /
 
 
关于Kafka的学习,大家可以直接参考:http://kafka.apache.org/quickstart
1.3 收集日志-Lua
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,
其设计目的是为了嵌入应用
程序中,从而为应用程序提供灵活的扩展和定制功能。
OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三
方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web
服务和动态网关。OpenResty 通过lua脚本扩展nginx功能,可提供负载均衡、请求路由、安全认证、
服务鉴权、流量控制与日志监控等服务。
OpenResty® 通过汇聚各种设计精良的 Nginx 模块(主要由 OpenResty 团队自主开发),从而将
Nginx 有效地变成一个强大的通用 Web 应用平台。这样,Web 开发人员和系统工程师可以使用 Lua 脚
本语言调动 Nginx 支持的各种 C 以及 Lua 模块,快速构造出足以胜任 10K 乃至 1000K 以上单机并发连
接的高性能 Web 应用系统。
关于Lua的基本知识,我们这里就不学习了,直接进入日志收集的使用操作。
1.3.1 OpenRestry安装
关于OpenRestry的学习,大家可以参考:http://openresty.org/cn/
下载OpenRestry:
解压:
安装(进入到解压目录进行安装):
wget https://openresty.org/download/openresty-1.11.2.5.tar.gz
tar -xf openresty-1.11.2.5.tar.gz
 
 
软件会安装到
/usr/local/openresty ,这里面会包含nginx。
配置环境变量:
1.3.2 详情页发布
商品详情页生成后会存储在 /usr/local/server/web/items 目录下,详情页是静态网页,我们可以使
用Nginx直接发布。
商品详情页的访问:http://192.168.211.137/items/S1235433012716498944.html,我们可以让所有
以 /items/ 的请求直接到 /usr/local/server/web/ 目录下找。
修改nginx.conf:
修改内容如下:
启动nginx,并访问测试:http://192.168.211.137/items/S1235433012716498944.html
1.3.3 Lua日志收集
cd openresty-1.11.2.5
./configure --prefix=/usr/local/openresty --with-luajit --without
http_redis2_module --with-http_stub_status_module --with-http_v2_module --with
http_gzip_static_module --with-http_sub_module
make
make install
vi /etc/profile
export PATH=/usr/local/openresty/nginx/sbin:$PATH
source /etc/profile
cd /usr/local/openresty/nginx/conf/
vi nginx.conf
 
 
使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件 lua
restry-kafka 。关于 lua-restry-kafka 的下载和使用,可以参考
https://github.com/doujiang24/lua-resty-kafka
1)收集流程
日志收集流程如下:  看到
 
 
 
用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信
息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,
也能完成文件的访问。
2)插件配置
lua-restry-kafka :https://github.com/doujiang24/lua-resty-kafka
在 资料\\lua 中已经提供了该包 lua-resty-kafka-master.zip ,我们需要将该文件上传
到 /usr/local/openrestry 目录下,并解压,再配置使用。
解压:
unzip lua-resty-kafka-master.zip
配置:
修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:
lua_package_path "/usr/local/openresty/lua-resty-kafka-master/lib/?.lua;;";
配置效果图如下:3)日志收集
用户访问详情页的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这
里要实现Kafka消息生产者。
我们定义一个消息格式:
生产者脚本:
rewrite_by_lua
定义好了消息格式后,创建一个生产者,往Kafka中发送详情页的访问信息。我们创建一个lua脚
本, items-access.lua ,脚本内容如下:
上图脚本内容如下:
"actime": "
2020-4-10 9:50:30",
"uri":
"h
t
t
p
:/
/
1
9
2
.1
6
8
.211.137/items/S1235433012716498944.html",
"ip": "
1
1
9
.
12
3
.
3
3
.2
3
1
",
"token": "Bearer ITHEIMAOOPJAVAITCAST"
--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list =
host = "192.168.211.137", port = 9092
--创建生产者
local pro = producer:new(broker_list, producer_type="async")
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or
ngx.var.remote_addr or "0.0.0.0"
 
 
4)nginx配置
按照上面的流程图,我们需要配置nginx的2个location,修改nginx.conf,代码如下:
上图代码如下:
--定义消息内容
local logjson =
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer ITHEIMA"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
--发送消息
local offset,
err = pro:send("itemaccess", nil, cjson.encode(logjson))
server
listen 80;
server_name localhost;
#/web开始的请求,做日志记录,然后跳转到下面的location
location /web/items/
content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
#商品详情页,以/items/开始的请求,直接在详情页目录下找文件
 
 
5)日志收集测试
请求地址:http://192.168.211.137/web/items/S1235433012716498944.html
查看Kafka的itemaccess
队列数据:
2 Apache Druid日志实时分析
2.1 业务分析
秒杀业务中,通常会有很多用户同时蜂拥而上去抢购热卖商品,经常会出现抢购人数远大于商品库存。
其实在秒杀过程中,热卖商品并不多,几乎只占1%,而99%的流量都源自热卖商品,很有可能因为这
1%的热卖商品导致服务器宕机,因此针对热卖商品我们要做特殊处理。
热卖商品我们这里称为热点商品,针对热点商品的处理,有这么几种思路,一是优化,二是限制,三是
隔离。
优化:优化热点数据最有效的办法就是缓存热点数据。
限制:限制其实是一种削峰手段,我们可以把热点商品抢单采用队列来存储用户抢单信息,将热点抢单
限制在一个队列里,防止热点商品抢单占用太多的资源服务,而使得其他服务无法获取抢单机会。
隔离:隔离其实就是将热点商品和非热点商品进行数据源的隔离、操作流程的隔离,不要因为1%的热
点数据影响到另外的99%数据。我们可以把热点商品数据存储到缓存中和非热点数据分开,抢单程序也
可以和非热点抢单分开。
热点数据又分为离线热点数据和实时热点数据,离线热点数据主要是分析过往热点商品信息,这个统计
起来并无难度,可以直接从历史数据库中查询分析。但根据用户抢单实时数据进行分析是一个很困难的
事,首先要存储大量的访问信息,同时还能高效的实时统计访问日志信息,从中获取热点商品信息。
location /items/
#日志处理
#content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
root /usr/local/server/web/;
 
 
OLTP
OLAP
用户
面向操作人员,支持日常操作
面向决策人员,支持管理需要
功能
日常操作处理
分析决策
DB 设计
面向应用,事务驱动
面向主题,分析驱动
数据
当前的,最新的细节的
历史的,聚集的,多维的,集成的,统一的
存取
可更新,读/写数十条记录
不可更新,但周期性刷新,读上百万条记录
工作单位
简单的事务
复杂的查询(海量数据)
DB 大小
100MB-GB
100GB-TB/PB
2.2 Apache Druid
2.2.1 Apache Druid介绍
介绍
Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时
摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的
多维 OLAP 分析。
另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚
合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。
OLTP与OLAP的区别:
OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理。
OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的分析查
询结果。
OLAP和OLTP区别:
OLTP就是面向我们的应用系统数据库的,OLAP是面向数据仓库的。
Apache Druid 特性:
 
 
开源OLAP数据处理系统性能方面我们做个对比:
Apache Druid 架构设计
Druid自身包含下面4类节点:
同时,Druid集群还包含以下3类外部依赖:
亚秒响应的交互式查询,支持较高并发。
支持实时导入,导入即可被查询,支持高并发导入。
采用分布式 shared-nothing 的架构,可以扩展到PB级。
支持聚合函数,count 和 sum,以及使用 javascript 实现自定义 UDF。
支持复杂的 Aggregator,近似查询的 Aggregator 例如 HyperLoglog 以及 Yahoo 开源的
DataSketches。
支持Groupby,Select,Search查询。
1.Realtime Node:即时摄入实时数据,生成Segment(LSM-Tree实现与Hbase基本一致)文件。
2.Historical Node:加载已生成好的数据文件,以供数据查询。
3.Broker Node:对外提供数据查询服务,并同时从Realtime Node和Historical Node查询数据,合
并后返回给调用方。
4.Coordinator Node:负责Historical Node的数据负载均衡,以及通过Rule管理数据生命周期。
1.元数据库(Metastore):存储druid集群的元数据信息,如Segment的相关信息,一般使用MySQL或
PostgreSQL
2.分布式协调服务(Coordination):为Druid集群提供一致性服务,通常为zookeeper
3.数据文件存储(DeepStorage):存储生成的Segment文件,供Historical Node下载,一般为使用
HDFS
 
 
数据摄入
Apache Druid同时支持流式和批量数据摄入。通常通过像 Kafka 这样的消息总线(加载流式数据)或
通过像 HDFS 这样的分布式文件系统(加载批量数据)来连接原始数据源。
2.2.2 Apache Druid安装
Apache Druid的安装方面,我们可以参考官方文档实现。
JDK:java8(8u92+)
下载地址:https://druid.apache.org/downloads.html
 
 
解压该压缩包:
包文件如下:
启动单机版Apache Druid:
启动后,访问:http://192.168.211.137:8888
2.2.3 数据摄入
2.2.3.1 离线数据摄入
从一个文件中将数据加载到 Apache Druid ,参考地址:
W <https://druid.apache.org/docs/latest/tutorials/tutorial-batch.html> ,如下操作:
1)点击Load data->Local disk->Connect data
tar -xf a
p
ac
he
-d
ru
i
d
-
0
.17.0-bin.tar.gz
cd apache
-d
ru
id
-0
.
1
7
.
0
./bin/start-micro-quickstart
 
 
2)选择要导入的数据
我们要导入的数据在 /usr/local/server/apache-druid-
0.17.0/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz ,需要把该文件的相对路径
填写到右边表单中,再点击Apply,如下图:
 
 
 
3)解析数据
在上一个步骤上点击Next:Parse data,此时会解析导入的数据,如下图:
4)解析时间
在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的
数据没有该列,Apache Druid会自动帮助我们创建该列,如下图:5)数据分区设置
点击下一步一直到Partition,我们根据需要设置数据分区方式,如下图:
 
 
 
讲解:
Type:数据粒度使用的类型
Segment granularity:分片文件每个segment包含的时间戳范围
Force guaranteed rollup:是否启用批量推送模式
Partitioning type:分区类型
Max rows per segment:用于分片。确定每个段中的行数。
更多参数如下图:
 
6)设置数据源
Publish设置,注意设置数据源名字,这里类似数据库中数据库名字。
7)提交配置
最后一步需要提交配置,如下图,点击submit即可。
2.2.3.2 实时数据摄入
前面的案例是离线数据的摄入,接着我们实现实时数据摄入,我们以收集用户访问商品详情页的访问记
录为例,如下图:
参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html
 
 
 
 
 
1)load data
2)配置Kafka源
3)配置数据源名字
其他的步骤和之前文件摄入一样,直到配置数据源名字,我们配置数据源名字叫itemlogs,最后一步
submit和之前一样,如下图:查询效果如下:
2.2.4 Druid SQL
2.2.4.1 简介
Apache Druid SQL是一个内置的SQL层,是Druid基于JSON的查询语言的替代品,由基于Apache
Calcite的解析器和规划器提供支持。Druid SQL将SQL转换为Broker本机Druid查询,然后将其传递给
数据进程。除了在Broker上转换SQL的(轻微)开销之外,与本机查询相比,没有额外的性能损失。
2.2.4.2 语法
每个Druid数据源都显示为“Druid”模式,这也是默认模式,Druid数据源引用为 druid.dataSourceName
或者简单引用 dataSourceName 。
可以选择使用双引号引用数据源和列名等标识符。要在标识符中转义双引号,请使用另一个双引号,例
如 "My ""cat"" identifier" ,所有标识符都区分大小写。
文字字符串应引用单引号,如 \'foo\' ,文字数字可以用 100 (表示整数), 100.0 (表示浮点值)或
1.0e5 (科学记数法)等形式编写。时间戳可以写成 TIMESTAMP \'2000-01-01 00:00:00\' 。时间算
法,可以这样写 INTERVAL \'1\' HOUR , INTERVAL \'1 02:03\' DAY TO MINUTE , INTERVAL \'1-2\'
YEAR TO MONTH ,等等。
Druid SQL支持具有以下结构的SELECT查询:
 
 
查询所有:
查询count列:
查询前5条:
分组查询:
排序:
求和:
最大值:
[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] * | exprs
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit
]
[ UNION ALL <
a
nother query> ]
SELECT * FROM "itemlogs"
SELECT "count" FROM "itemlogs"
SELECT * FROM "itemlogs" LIMIT 5
SELECT ip FROM "itemlogs" GROUP BY ip
SELECT * FROM "itemlogs" ORDER BY __time DESC
SELECT SUM("count") FROM "itemlogs"
SELECT MAX("count") FROM "itemlogs"
 
 
平均值:
查询6年前的数据:
去除重复查询:
2.2.5 JDBC查询Apache Druid
Apache Calcite是面向Hadoop新的查询引擎,它提供了标准的SQL语言、多种查询优化和连接各种数
据源的能力,除此之外,Calcite还提供了OLAP和流处理的查询引擎。
如果使用java,可以使用Calcite JDBC驱动程序进行Druid SQL查询。可以下载Avatica客户端jar后,将
其添加到类路径并使用连接字符串
jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/
如果是Maven项目,需要引入 avatica-core 包,如下:
使用案例:
SELECT AVG("count") FROM "itemlogs"
SELECT * FROM
"wikiticker" WHERE "__time" >= CURRENT_TIMESTAMP - INTERVAL \'6\'
YEAR
SELECT DISTINCT "count" FROM "accessitem"
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.15.0</version>
</dependency>
public static void main(String[] args) throws Exception
//链接地址
String url =
"jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/";
AvaticaConnection connection = (AvaticaConnection)
DriverManager.getConnection(url);
//SQL语句,查询2020-4-10 11:50:30之后的访问uri和访问数量
String sql="SELECT uri,count(*) AS \\"viewcount\\" FROM(SELECT * FROM
\\"itemlogs\\" WHERE __time>\'2020-4-10 11:50:30\' ORDER BY __time DESC) GROUP BY
uri LIMIT 100";
//创建Statment
AvaticaStatement statement = connection.createStatement();
//执行查询
 
 
知识点:
Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代
码如下:
3 热点数据隔离
热点数据统计主要是为了找出热点数据,找出热点数据后,我们需要对热点数据采取各种措施,例如隔
离、做缓存、优化等。
3.1 热点数据隔离流程分析
我们这章实现热点数据收集,我们可以以小时为单位,算出平均每小时访问量最高的商品信息,并对该
商品信息进行隔离,下单方式也单独处理,流程如下图:
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next())
//获取uri
String uri = resultSet.getString("uri");
String viewcount = resultSet.getString("viewcount");
System.out.println(uri+"--------->"+viewcount);
sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -
rl ./`
 
 
流程说明:
1.指定一个时间段内访问频率最高的商品->热点商品
2.将热点商品存入到Redis缓存
3.2 实时热点数据分析
我们在热点数据分析系统中查询Druid,然后将热点数据存入到Redis缓存进行隔离。我们可以采用
elastic-job每5秒钟查询一次被访问的商品信息,如果访问量超过1000,我们可以认为是热点数据,并
且这里不能查历史访问量,应该查询近期一段时间,比如最近1天最近1小时最近一分钟等。热点数据查
询出来后,我们需要将热点数据隔离,隔离的方式我们可以直接采用将数据单独存储到Redis的方式隔
离。
热点数据隔离:
3.2.1 热点数据查询
工程名字: seckill-monitor ,我们在该工程下实现热点数据查询功能,Redis集群我们就不在这里演
示搭建了,直接配置链接使用了。
1.实时读取Apache Druid的数据
2.分析哪些数据访问频率高
3.对访问频率高的数据进行隔离处理,可以把数据单独放到Redis缓存中
4.用户每次下单的时候,可以先到Redis缓存中检测该商品是否是热点商品,如果不是热点商品,则直接走
订单系统下单,如果是热点商品,则走Kafka排队,不直接下单
a.编写定时任务->定时查询Druid
b.配置Redis集群->热点商品存入到Redis实现隔离
c.每次定时查询热点商品的时候,需要排除之前已经成为热点商品的数据
 
 
1)配置Redis链接
在bootstrap.yml中配置redis集群链接,如下代码:
2)配置定时任务
因为我们需要定时去查询Apache Druid,所以我们可以配置elastic-job来查询热点数据,在
bootstrap.yml中配置如下:
3)热点数据查询
我们查询最近5小时访问量超过1000的商品,真实环境中时间粒度会更小,每次查询的时候,之前已经
被定为热点商品的数据要排除。
SQL语句如下:
接着我们用代码把上面的语句实现定时查询即可,每次查询出来的热点数据需要存入到Redis中进行隔
离,存入到Redis中的数据我们给个固定前缀方便查询,key的规则定为: SKU_id ,例如:商品
id=S990,key= SKU_S990 。
另外一种参考:
在bootstrap.yml中配置druid地址:
redis:
cluster:
nodes:
- red
i
s
-
s
e
r
v
e
r
:
7
0
0
1
- red
i
s
-
s
e
r
v
e
r
:
7
0
02
-
r
e
d
i
s
-
s
e
r
v
e
r
:
7
0
0
3
-
r
e
d
i
s
-
s
e
r
v
e
r
:
7
0
04
- redis-server:7005
- redis-server:7006
elaticjob:
zookeeper:
server-lists: zk-server:2181
namespace: monitortask
SELECT uri,count(*) AS "viewcount" FROM(SELECT * FROM "itemlogs" WHERE
__time>\'2020-04-10 14:01:46\' ORDER BY __time DESC) GROUP BY uri HAVING
"viewcount">1000 LIMIT 1000
SELECT COUNT(*) AS "ViewCount","uri" FROM "logsitems" WHERE
__time>=CURRENT_TIMESTAMP - INTERVAL \'1\' HOUR GROUP BY "uri" HAVING
"ViewCount">3
 
 
创建 com.seckill.monitor.hot.MonitorItemsAccess ,在该类中实现查询:
#Druid
druidurl:
jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/
@Component
public class
MonitorItemsAccess
@Value
("$druidurl")
private String druidurl;
@Aut
owired
private RedisTemplate redisTemplate;
/****
* 查询统计数据,1天以内的热点秒杀商品
* @throws Exception
*/
public List<String> loadData() throws Exception
//获取连接对象
AvaticaConnection connection = (AvaticaConnection)
DriverManager.getConnection(druidurl);
//创建Statment
AvaticaStatement statement = connection.createStatement();
//执行查询
ResultSet resultSet = statement.executeQuery(druidSQL());
//记录所有热点商品的ID
List<String> ids = new ArrayList<String>();
while (resultSet.next())
//获取uri,格式:/web/items/S1235433012716498944.html
String uri = resultSet.getString("uri");
//处理掉/web/items/和.html
if(uri.startsWith("/web/items/") && uri.endsWith(".html"))
uri=uri.replaceFirst("/web/items/","");
uri=uri.substring(0,uri.length()-5);
//记录ID
ids.add(uri);
return ids;
/***
* 组装SQL
* @return
*/
public String druidSQL()
//加载所有热点秒杀商品的ID
Set<String> keys = redisTemplate.keys("SKU_*");
//1天前的时间
String yesterday =
TimeUtil.date2FormatYYYYMMDDHHmmss(TimeUtil.addDateHour(new Date(), -72));
 
 
4)定时查询热点数据
我们这里实现每5秒中查询1次热点数据,采用 elastic-job 定时操作。
创建 com.seckill.monitor.task.MonitorTask ,实现定时调用查询热点数据,代码如下:
//SQL语句
String sql="SELECT uri,count(*) AS \\"viewcount\\" FROM(SELECT * FROM
\\"itemlogs\\" WHERE __time>\'"+yesterday+"\'";
//排除掉已经存在的数据
if(keys!=null && keys.size()>0)
StringBuffer buffer = new StringBuffer();
for (String key : keys)
buffer.append("\'/web/items/"+key.substring(4)+".html\',");
S
tring ids =
buffer.toS
tring().substring(0,buffer.toString().length()-1);
/
/
S
Q
L
s
q
l+
="
A
ND uri NOT IN("+ids+")";
//排序部分组装
sql+=" ORDER BY __time DESC) GROUP BY uri HAVING \\"viewcount\\">1 LIMIT
5000";
return sql;
@ElasticSimpleJob(
cron = "1/5 * * * * ?",
jobName = "monitortask",
shardingTotalCount = 1
)
@Component
public class MonitorTask implements SimpleJob
@Autowired
private MonitorItemsAccess monitorItemsAccess;
/***
* 执行任务
* @param shardingContext
*/
@Override
public void execute(ShardingContext shardingContext)
try
List<String> ids = monitorItemsAccess.loadData();
catch (Exception e)
e.printStackTrace();
 
 
3.2.2 实时热点数据隔离
热点数据隔离,需要考虑很多问题,首先要将商品从数据库中进行锁定,然后将商品数据导入到
Redis,导入到Redis的时候,需要支持事务操作。
1)Service
在 seckill-goods 的 com.seckill.goods.service.SkuService 中添加隔离方法,代码如下:
在 com.seckill.goods.service.impl.SkuServiceImpl 中添加隔离实现方法:
/***
* 热点商品隔离
* @param id
*/
void hotIsolation(String id);
@Autowired
private RedisTemplate redisTemplate;
/***
* 热点商品隔离
*/
@Override
public void hotIsolation(String id)
Sku sku = new Sku();
sku.setIslock(2);
Example example = new Example(Sku.class);
Example.Criteria criteria = example.createCriteria();
criteria.andEqualTo("islock",1);
criteria.andEqualTo("id",id);
//执行锁定
int mcount = skuMapper.updateByExampleSelective(sku,example);
if(mcount>0)
//查询商品剩余库存
Sku currentSku = skuMapper.selectByPrimaryKey(id);
 
 
2)Controller
在 seckill-goods 的 com.seckill.goods.controller.SkuController 中添加隔离方法调用,代码
如下:
3)Feign
在 seckill-goods-api 的 com.seckill.goods.feign.SkuFeign 中添加,代码如下:
4)热点数据隔离调用
在 seckill-monitor 的 com.seckill.monitor.task.MonitorTask 中添加隔离方法调用,代码如
下:
//剩余库存
String prefix = "SKU_";
redisTemplate.boundHashOps(prefix+id).increment("num",currentSku.getSeckillNum(
));
//提取Sku的信息
Map<String,Object> skuMap = new HashMap<String,Object>();
skuMap.put("id",id);
skuMa
p
.
p
u
t
(
"
p
r
i
ce
"
,c
u
r
re
nt
S
ku
.g
et
S
e
ck
i
ll
Price());
skuMa
p
.
p
u
t
(
"
na
m
e
"
,c
u
r
re
nt
S
ku
.g
et
Na
m
e
()
)
;
re
disTemplate.boundHashOps(prefix+id).put("info",skuMap);
/***
* 热点商品隔离
*/
@PostMapping(value = "/hot/isolation")
public Result hotIsolation(@RequestParam List<String> ids)
if(ids!=null && ids.size()>0)
for (String id : ids)
skuService.hotIsolation(id);
return new Result(true,StatusCode.OK,"热点商品隔离成功!");
/***
* 热点商品隔离
*/
@PostMapping(value = "/sku/hot/isolation")
Result hotIsolation(@RequestParam List<String> ids);
 
 
5)测试
我们启动整个服务进行测试,Redis中的数据如下:
3.3 Redis集群事务问题
Redis集群是不具备事务的,单个节点是具备事务的,所以我们商品信息存储到Redis集群多个节点中是
没法实现集群事务控制,上面的代码如下图:
我们观察上面代码,①和②处其实key相同,既然key相同,那么数据一定不是存储在不同节点上,如果
把2次操作Redis合成一次操作Reids,就不会有事务问题了,我们可以把上面代码改造一下即可解决事
务问题,改造代码如下图:
 
 
4 用户登录
用户抢单的时候,必须要先登录,我们先编写一个方法,用于实现用户登录,用户登录成功后,每次抢
单的时候,还需要识别用户身份,我们这里采用JWT令牌保存用户身份信息,每次抢单识别JWT令牌即
可。
4.1 Jwt令牌
JWT令牌这里我们将实现管理员令牌生成和普通用户令牌生成,管理员和普通用户他们生成了令牌的秘
钥一定是不同的。
在 seckill-common 工程中添加JWT令牌生成类 com.seckill.util.JwtTokenUtil ,在该类中实现令
牌生成以及令牌解析,代码如下:
public class JwtTokenUtil
//秘钥
public static final String
SECRETUSER="5pil6aOO5YaN576O5Lmf5q+U5LiN5LiK5bCP6ZuF55qE56yR";//用户
public static final String
SECRETADMIN="ADMIN5pil6aOO5YaN576O5Lmf5q+U5LiN5LiK5bCP6ZuF55qE56yR";//管理员
/***
* 生成令牌-管理员
* @param uid:唯一标识符
* @param ttlMillis:有效期
* @return
* @throws Exception
*/
public static String generateTokenAdmin(String uid,Map<String,Object>
payload, long ttlMillis) throws Exception
return generateToken(uid,payload,ttlMillis,SECRETADMIN);
/***
* 生成令牌-普通用户
* @param uid:唯一标识符
* @param ttlMillis:有效期
 
 
* @return
* @throws Exception
*/
public static String generateTokenUser(String uid,Map<String,Object>
payload, long ttlMillis) throws Exception
return generateToken(uid,payload,ttlMillis,SECRETUSER);
/***
* 生成令牌
* @param
uid:唯一标识符
* @pa
ra
m
ttlMillis:有效期
* @re
t
u
r
n
* @
throws Exception
*/
public static String generateToken(String uid,Map<String,Object> payload,
long ttlMillis,String secret) throws Exception
SignatureAlgorithm signatureAlgorithm = SignatureAlgorithm.HS256;
long nowMillis = System.currentTimeMillis();
Date now = new Date(nowMillis);
Key signingKey = new SecretKeySpec(secret.getBytes(),
signatureAlgorithm.getJcaName());
Map<String,Object> header=new HashMap<String,Object>();
header.put("typ","JWT");
header.put("alg","HS256");
JwtBuilder builder = Jwts.builder().setId(uid)
.setIssuedAt(now)
.setIssuer(uid)
.setSubject(uid)
.setHeader(header)
.signWith(signatureAlgorithm, signingKey);
//设置载体
builder.addClaims(payload);
if (ttlMillis >= 0)
long expMillis = nowMillis + ttlMillis;
Date exp = new Date(expMillis);
builder.setExpiration(exp);
return builder.compact();
/***
* 解密JWT令牌
*/
public static Map<String, Object> parseToken(String token)
//以Bearer开头处理
if(token.startsWith("Bearer"))
token=token.substring(6).trim();
//秘钥处理
SignatureAlgorithm signatureAlgorithm = SignatureAlgorithm.HS256;
Key signingKey = new SecretKeySpec(SECRETUSER.getBytes(),
signatureAlgorithm.getJcaName());
 
 
4.2 用户登录
在 seckill-us
er 中实现用户登录,用户登录表机构如下:
1)Service
在 com.seckill.user.service.UserService 中编写登录方法,代码如下:
在 com.seckill.user.service.impl.UserServiceImpl 中编写登录方法实现,代码如下:
Claims claims = Jwts.parser()
.setSigningKey(signingKey)
.parseClaimsJws(token)
.getBody();
return claims;
CREATE T
ABLE `tb_user` (
`username` varchar(50) NOT NULL COMMENT \'用户名\',
`password` varchar(100) NOT NULL COMMENT \'密码,加密存储,MD5加密\',
`phone` varchar(20) DEFAULT NULL COMMENT \'注册手机号\',
`email` varchar(50) DEFAULT NULL COMMENT \'注册邮箱\',
`created` datetime NOT NULL COMMENT \'创建时间\',
`updated` datetime NOT NULL COMMENT \'修改时间\',
`nick_name` varchar(50) DEFAULT NULL COMMENT \'昵称\',
`name` varchar(50) DEFAULT NULL COMMENT \'真实姓名\',
PRIMARY KEY (`username`),
UNIQUE KEY `username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT=\'用户表\';
/**
* 根据ID查询User
* @param id
* @return
*/
User findById(String id);
/**
* 根据ID查询User
* @param id
* @return
*/
@Override
public User findById(String id)
return userMapper.selectByPrimaryKey(id);
 
 
2)Controller
在 com.seckill.user.controller.UserController 中编写登录实现方法,代码如下:
我们可以生成一个令牌。
登录地址:http://localhost:8001/api/user/login?username=itheima&password=itheima
生成的令牌如下:
4.3 Jwt令牌识别
/***
* 根据ID查询User数据
* @return
*/
@GetMapping("
/
l
o
g
i
n
"
)
public Result
<
U
s
e
r
>
f
in
d
By
Id
(
St
ri
ng
us
ername,String password) throws Exception
//调用User
S
e
r
v
i
c
e
U
se
r
User
us
e
r
=
u
se
rService.findById(username);
if(use
r=
=
n
u
ll
)
return new Result<User>(false,StatusCode.ERROR,"用户不存在");
if(!user.getPassword().equals(DigestUtils.md5DigestAsHex(password.getBytes())))
return new Result<User>(false,StatusCode.ERROR,"密码错误");
//登录成功,生成令牌
Map<String,Object> payload = new HashMap<String,Object>();
payload.put("username",user.getUsername());
payload.put("name",user.getName());
payload.put("phone",user.getPhone());
//生成令牌
String jwt
=JwtTokenUtil.generateTokenUser(UUID.randomUUID().toStrin

模板与泛型编程

16.1.1函数模板

//template parameter list
template int compare(const T1& v1, const T2&v2) {     if (v1 < v2) return -1;     if (v2 < v1) return 1;     return 0; }

When we call a function template, the compiler (ordinarily) uses the arguments of the call to deduce the template parameter(s) for us.

The compiler uses the deduced template parameter(s) to instantiatea specific version of the function for us.

模板类型参数

函数模板的参数列表必须明确标明typename或者class,两个是同义的(typename在模板引入C++之后才出现,很多早先的程序使用class)

非类型模板参数

// 整数
template int compare(const char(&p1)[N], const char(&p2)[M]) {     strcpy(p1,p2); } // 指针 template<const char* C> void func1(const char* str) {     cout << C << " " << str << endl; }  // 引用 template<char(&R)[9]> void func2(const char* str) {     cout << R << " " << str << endl; }  // 函数指针 template<void(*f)(const char*)> void func3(const char* c) {     f(c); } void print(const char* c) { cout << c << endl; } char arr[9] = "template";   // 全局变量,具有静态生存期 int main() {     func1("pointer");     func2("reference");     func3("template function pointer");     return 0; }

绑定到非类型整型的实参必须是一个常量表达式,绑定到指针或引用的非类型参数的实参必须有静态的生存期(比如全局变量)

inline和constexpr的函数模板

template inline int compare(const T1& v1, const T2&v2); template constexpr int compare(const T1& v1, const T2&v2);

 

对于函数模板,编译器可以通过参数推断,得出模板中的类型信息,因而在使用函数模板时可以省略

16.1.2类模板

对于函数模板,编译器可以通过参数推断,得出模板中的类型信息,因而在使用函数模板时可以省略。对于类模板,必须在模板后的尖括号中提供信息,代替模板参数实参列表。

  1. 类模板中,模板参数被被当做stand-ins(替身),使用时被替换
  2. 类模板中的成员函数只有在使用时才会被实例化,这使的某种类型即使不完全符合模板要求,仍然可以使用该类型的模板。
  3. 实例化后的类型为ClassName类型。
  4. 类包含友元时,模板类包含非模板友元,则友元可以访问模板的所有类型实例;友元是模板,则所有的友元实例都可以访问模板类的所有实例。
  5. 对于static成员,每一个不同类型实例,都有不同的static成员,相同类型的实例实例化出来的对象共享static成员。
技术分享
#include #include  using namespace std;  template  class BlobPtr;// forward declarations needed for friend declarations in Blob template  class Blob; // needed for parameters in operator==  template  bool operator==(const Blob&, const Blob&); template class Blob {     // each instantiation of Blob grants access to the version of     // BlobPtr and the equality operator instantiated with the same type     friend class BlobPtr;     friend bool operator== (const Blob&, const Blob&); public:     using size_type = vector::size_type;     //construct function     Blob():data(make_shared
类模板示例

16.1.3模板参数

模板参数的作用域在声明之后定义结束之前。

模板参数可以覆盖外部作用域的类型参数。

模板参数可以有默认值

使用类的类型成员时需要使用typename关键字,如typename T::value_type()

//默认模板参数
templateint> //使用参数类型的类型成员作为返回值的类型 typename vector::size_type differ(vector& lh, vector& rh) {     return lh.size() - rh.size(); }

16.1.4类的成员也可以是模板

普通类中的成员可以为模板

模板类中的成员可以为模板

//为了在unique_ptr析构对象的时候打印出来信息
class DebugDelete { public:     DebugDelete(std::ostream&s = std::cerr) :os(s) {}     template  void operator()(T*p)const     {         os << "deleting unique_ptr" << std::endl;         delete p;     } private:     std::ostream &os; }; int main() {     //会生成一个int版本的DebugDelete实例     unique_ptr<int, DebugDelete> p(new int, DebugDelete());     return 0; }

16.1.5控制实例化

模板被使用的时候才会实例化,其实例可能出现在多个对象文件中,当独立编译的原文件使用相同的模板并提供了相同的模板参数,每个文件都会有一个实例。为了解决这个开销,可以使用显式实例化。

extern template declaration; //实例化的声明

template declaration;//实例化的定义

编译器遇到external声明,将会在其他位置寻找定义,但在其他位置必须有一个定义。

实例化定义会实例化所有成员,所以类型必须能够引用所有成员函数

16.2模板实参推断

  1. 在调用模板的时候,一般不会发生参数类型的转换。会发生的转换有两种,非const转换成const,数组或函数名转换成指针。
  2. 如果函数参数不是模板参数类型,这个参数是可以进行类型转换的
  3. 使用相同模板参数类型的函数形参类型必须相同,不会进行参数转换
  4. 当模板实参类型无法推断,例如返回值类型时,必须显式指定。(通过fun
  5. 显式指定了模板实参类型的,也可以进行类型转换(fun(15),15转换为double)。

16.2.3尾置返回类型与类型转换

有时候返回值的类型与参数的类型有关,这时候可以用尾置返回类型

template  auto fcn(It beg, It end) -> decltype(*beg) {     return *beg; }

由于解引用运算返回的是引用类型,所以返回值是引用类型。

如果要返回的是值,则使用remove_reference(是一个转换模板,定义在type_traits头文件中)。

template   //第二个typename表示type成员为类型 auto fcn(It beg, It end) -> typename remove_reference

其他的类型转换模板请参见这里

16.2.6理解std::move

Std::move将一个对象转变为右值引用对象。

template<class _Ty> inline constexpr typename remove_reference<_Ty>::type&& move(_Ty&& _Arg) noexcept {    // forward _Arg as movable
    return (static_cast::type&&>(_Arg)); }

在调用时

std::string s1("hi!"), s2; s2 = std::move(std::string("bye!")); s2 = std::move(s1);

使用调用

s2 = std::move(std::string("bye!"));
  1. 推断出模板类型_Ty为string
  2. remove_reference<_Ty>::type为string
  3. move返回类型为string&& 

使用调用

s2 = std::move(s1);
  1. 推断出模板类型_Ty为string&
  2. remove_reference<_Ty>::type为string
  3. move返回类型为string& &&,折叠后为string&
  4. move函数参数t实例化为string& &&,会折叠为string&

16.2.7转发

某些函数需要将其一个或多个实参和类型不变地转发给其他函数,包括是否是const以及左值还是右值。

这个模板将一个函数的参数转发到里边的函数。

template void flip1(F f, T1 t1, T2, t2) {     f(t2, t1); }

这个模板在值传递的实例中是正确的,但是在如下调用:

function<void(int, int&)> fun = [](int a, int& b) {cout << a << ++b << endl; }; int a = 1, b = 2; flip1(fun, a, b); cout << a << b << endl;

你会发现,函数并没有改变第二个参数的值,没有将引用这个参数类型传入。

 

修改模板为

template void flip1(F f, T1&& t1, T2&& t2) {     f(t1, t2); }

这也会存在一些情况下的错误

function<void(int&&, int&)> fun = [](int &&a, int& b) {cout << a << ++b << endl; }; int a = 1; flip1(fun, 1, a);

1为右值传递到模板flip1中后,作为一个变量的形参,会成为一个左值(变量是左值),左值绑定到fun的第一个右值参数时,会出现错误。

 

再次修改为

template void flip1(F f, T1&& t1, T2&& t2) {     f(std::forward(t1), std::forward(t2)); }

使用forward,将返回类型为T&&的结果,如果出入为值,则返回为右值,传入为左值引用,则引用折叠后返回左值引用。

16.3重载与模板

如下两个模板

template std::string debug_rep(const T& t) {     std::ostringstream ret;     ret << t;     return ret.str(); } template std::string debug_rep(T* p) {     std::ostringstream ret;     ret << "pointer: " << p << " " << (p ? debug_rep(*p) : "null pointer");     return ret.str(); }

如果有调用

int a = 1; std::cout << debug_rep(a) << std::endl;

只有第一个模板时可行的,则选择第一个实例化

 

如果有调用

int a = 1; std::cout << debug_rep(&a) << std::endl;

则有

debug_rep(const int*&) debug_rep(int*)

这两个版本的实例化,由于第一个需要有非const转换到const,所以选择了第二个版本。

 

如果有调用

const int* a = &b; std::cout << debug_rep(a) << std::endl;

则上述两个版本都是精确的,然而debug_rep(T* p)对于这个调用来讲,是更特例的版本,只针对于指针,所以选择最特例话的版本。(从另一个角度讲,(const T& t)适用任何类型,这样debug_rep(T* p)永远不会被使用,这条规则才能使其被使用)

 

另外,非模板函数会优先匹配

16.4可变参数模板

一个可变参数模板(variadic template)中可变数目的参数被称为参数包(parameter packet),分为两种参数包:模板参数包(template parameter packet)、函数参数包(function parameter)。

// Args is a template parameter pack; rest is a function parameter pack // Args represents zero or more template type parameters // rest represents zero or more function parameters
template  void foo(const T &t, const Args& ... rest) {     cout << sizeof...(Args) << endl;  // number of type parameters     cout << sizeof...(args) << endl;  // number of function parameters }

用typename…表示零或多个类型的列表,一个类型名后边跟省略号,表示非类型参数的列表。如果一个函数参数的类型是模板参数包,则此参数是一个函数参数包,可以用sizeof…求出参数的个数。

16.4.1例子,打印所有参数

第一个模板在第二个模板递归调用的最后一次中,时最为匹配的,调用时终止了第二个模板的递归。

// function to end the recursion and print the last element // this function must be declared before the variadic version of print is defined
template ostream &print(ostream &os, const T &t) {     return os << t; // no separator after the last element in the pack } // this version of print will be called for all but the last element in the pack template  ostream &print(ostream &os, const T &t, const Args&... rest) {     os << t << ", ";  // print the first argument     return print(os, rest...); // recursive call; print the other arguments }

16.4.2包扩展

template  ostream &print(ostream &os, const T &t, const Args&... rest) //扩展Args,生成参数列表 {     os << t << ", ";       return print(os, rest...); //扩展实参,形成实参列表 }

在扩展实参的时候,还可以这样用

print(os, debug_rep(rest)...);

可以将扩展出来的每一个参数,执行函数调用

return print(os, debug_rep(p1), debug_rep(p2), debug_rep(p3));

16.4.3转发参数包

// fun haszero or more parameters each of which is // an rvalue reference to a template parameter type
template

16.5模板特例化

// firstversion; can compare any two types
template  int compare(const T&, const T&); // second version to handle string literals template int compare(const char(&)[N], const char(&)[M]);

定义的这两个模板,可以比较任意对象类型。但是,如果传入的是字符串类型的指针(char*)时,回到第一个模板中调用,然而比较了参数指针的大小。

为了能够处理字符指针的类型,可以定义一个特列

// specialversion of compare to handle pointers to character arrays
template <>
int compare(const char* const &p1, const char* const &p2) {     return strcmp(p1, p2); }

当提供一个特例化,其类型必须与之前的模板参数中对应的类型匹配。如上是为了特例化第一个模板template int compare(const T&, const T&);了,其参数是顶层const的,而对于指针类型char*,其参数是底层const,所以在特例化的时候为const char* const &p1。

 

需要注意的是,特例化本质是实例化一个模板,而非重载。因此特例化不影响函数匹配。这与将其定义为非模板函数是有区别的。

加入定义了非模板函数,这样在传入字符常量的时候,例如

compare("hi", "mom");

会匹配到我们定义的非模板函数,而这个实际上我们希望匹配到int compare(const char(&)[N], const char(&)[M]); 

注意,特例化必须声明在作用域中,故声明在一个头文件中

 

类模板的特例化

如果自定义类需要使用无序容器,必须定义自己的hasher版本,或者特例化hash版本,一个特例化的hash类必须定义:

  1. 一个重载调用运算符,接收一个容器关键字类型的对象,返回一个size_t
  2. 两个类型成员,result_type和argument_type,分别为调用运算符的返回类型和参数类型
  3. 默认构造函数和拷贝赋值运算符(可以隐式定义)。

注意,特例化需要与被特例化的模板在同一个作用域,所以使用namespace参数。

示例:对Sales_data

// openthe std namespace so we can specialize std::hash
namespace std  {     template <>  // we‘re defining a specialization with
    struct hash // the template parameter of Sales_data     {         // the type used to hash an unordered container must define these types         typedef size_t result_type;         typedef Sales_data argument_type; // by default, this type needs ==         size_t operator()(const Sales_data& s) const;         // our class uses synthesized copy control and default constructor     };     size_t hash::operator()(const Sales_data& s) const     {         //下边是hash函数的算法,使用了string、uint、double的hash算法         return hash<string>()(s.bookNo) ^             hash()(s.units_sold) ^             hash<double>()(s.revenue);     } }

注意,有时候会使用私有成员进行hash,所以声明为友元

// needed for the friend declaration
template  class std::hash;   class Sales_data {     friend class std::hash;     // other members as before };

 

类模板的部分特例化

我们可以部分特例化类模板,不能部分特例化函数模板

template <class T> struct remove_reference {     typedef T type; }; // partialspecializations that will be used for lvalue and rvalue references
template <class T> struct remove_reference

特例化模板的成员

template  struct Foo {     Foo(constT &t = T()) : mem(t) { }     void Bar() {  }     T mem;     // other members of Foo }; template<>  // we‘re specializing a template void Foo<int>::Bar() // we‘re specializing the Bar member of Foo {     // do whatever specialized processing that applies to ints }

当使用int类型的模板时,会使用这个特例化的Bar()函数成员。

以上是关于1111111111111111222222222的主要内容,如果未能解决你的问题,请参考以下文章