微服务网关redis流量统计
Posted 低调的骏马
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务网关redis流量统计相关的知识,希望对你有一定的参考价值。
微服务网关(十二)redis流量统计中间件
redis流量统计中间件
redis详细:
redis.go:
// RedisConfPipline redis连接的设置方法,例如在流量统计中间件中设置数据和超时时间
func RedisConfPipline(pip ...func(c redis.Conn)) error
// 读取配置文件,创建连接
c, err := lib.RedisConnFactory("default")
if err != nil
return err
defer c.Close()
for _, f := range pip
f(c)
c.Flush()
return nil
// RedisConfDo redis执行操作的方法,例如在流量统计中间件中使用get方法获取redis中储存的流量数据
func RedisConfDo(commandName string, args ...interface) (interface, error)
c, err := lib.RedisConnFactory("default")
if err != nil
return nil, err
defer c.Close()
return c.Do(commandName, args...)
统计Handler:
flow_count_handler.go
单例模式设计,共用一个,避免重复创建,也为了使一个服务中的数据可以重复地累计
var FlowCounterHandler *FlowCounter
type FlowCounter struct
RedisFlowCountMap map[string]*RedisFlowCountService
RedisFlowCountSlice []*RedisFlowCountService
Locker sync.RWMutex
func NewFlowCounter() *FlowCounter
return &FlowCounter
RedisFlowCountMap: map[string]*RedisFlowCountService,
RedisFlowCountSlice: []*RedisFlowCountService,
Locker: sync.RWMutex,
func init()
FlowCounterHandler = NewFlowCounter()
func (counter *FlowCounter) GetCounter(serverName string) (*RedisFlowCountService, error)
for _, item := range counter.RedisFlowCountSlice
if item.AppID == serverName
return item, nil
//NewRedisFlowCountService 创建统计器
newCounter := NewRedisFlowCountService(serverName, 1*time.Second)
counter.RedisFlowCountSlice = append(counter.RedisFlowCountSlice, newCounter)
counter.Locker.Lock()
defer counter.Locker.Unlock()
counter.RedisFlowCountMap[serverName] = newCounter
return newCounter, nil
业务核心:
redis_flow_count.go
// RedisFlowCountService 流量统计器结构体
type RedisFlowCountService struct
AppID string
Interval time.Duration
QPS int64
Unix int64
TickerCount int64
TotalCount int64
// NewRedisFlowCountService 参数:设置APPID和统计结果的刷新时间频率
func NewRedisFlowCountService(appID string, interval time.Duration) *RedisFlowCountService
reqCounter := &RedisFlowCountService
AppID: appID,
Interval: interval,
QPS: 0,
Unix: 0,
go func()
defer func()
if err := recover(); err != nil
fmt.Println(err)
()
ticker := time.NewTicker(interval)
for
<-ticker.C
//获取请求次数reqCounter.TickerCount-->0,1,2,3,4...次
tickerCount := atomic.LoadInt64(&reqCounter.TickerCount)
//重置请求次数reqCounter.TickerCount-->0
atomic.StoreInt64(&reqCounter.TickerCount, 0)
currentTime := time.Now()
dayKey := reqCounter.GetDayKey(currentTime)
hourKey := reqCounter.GetHourKey(currentTime)
if err := RedisConfPipline(func(c redis.Conn)
//数据增加并写入tickerCount--->+0\\+1\\+2...
c.Send("INCRBY", dayKey, tickerCount)
//超时时间设置
c.Send("EXPIRE", dayKey, 86400*2)
c.Send("INCRBY", hourKey, tickerCount)
c.Send("EXPIRE", hourKey, 86400*2)
); err != nil
fmt.Println("RedisConfPipline err", err)
continue
totalCount, err := reqCounter.GetDayData(currentTime)
if err != nil
fmt.Println("reqCounter.GetDayData err", err)
continue
nowUnix := time.Now().Unix()
if reqCounter.Unix == 0
reqCounter.Unix = time.Now().Unix()
continue
tickerCount = totalCount - reqCounter.TotalCount
if nowUnix > reqCounter.Unix
reqCounter.TotalCount = totalCount
reqCounter.QPS = tickerCount / (nowUnix - reqCounter.Unix)
reqCounter.Unix = time.Now().Unix()
()
return reqCounter
func (o *RedisFlowCountService) GetDayKey(t time.Time) string
dayStr := t.In(lib.TimeLocation).Format("20060102")
//设置到redis的key中
return fmt.Sprintf("%s_%s_%s", RedisFlowDayKey, dayStr, o.AppID)
func (o *RedisFlowCountService) GetHourKey(t time.Time) string
hourStr := t.In(lib.TimeLocation).Format("2006010215")
return fmt.Sprintf("%s_%s_%s", RedisFlowHourKey, hourStr, o.AppID)
// GetHourData 封装的获取方法 redis的get获取数据
func (o *RedisFlowCountService) GetHourData(t time.Time) (int64, error)
return redis.Int64(RedisConfDo("GET", o.GetHourKey(t)))
func (o *RedisFlowCountService) GetDayData(t time.Time) (int64, error)
return redis.Int64(RedisConfDo("GET", o.GetDayKey(t)))
中间件详细:
http_flow_count.go
func HTTPFlowCountMiddleware() gin.HandlerFunc
return func(c *gin.Context)
serverInterface, ok := c.Get("service")
if !ok
middleware.ResponseError(c, 2001, errors.New("service not found"))
c.Abort()
return
//从上下文中获取后转换
serviceDetail := serverInterface.(*dao.ServiceDetail)
//统计项 1 全站 2 服务
//1、全站
totalCounter, err := public.FlowCounterHandler.GetCounter(public.FlowTotal)
if err != nil
middleware.ResponseError(c, 4001, err)
c.Abort()
return
totalCounter.Increase()
//_, _ = totalCounter.GetDayData(time.Now())
//fmt.Printf("totalCounter qps:%v,dayCount:%v\\n", totalCounter.QPS, dayCount)
//2、服务
serviceCounter, err := public.FlowCounterHandler.GetCounter(public.FlowServicePrefix + serviceDetail.Info.ServiceName)
if err != nil
middleware.ResponseError(c, 4001, err)
c.Abort()
return
serviceCounter.Increase()
//_, _ = serviceCounter.GetDayData(time.Now())
//fmt.Printf("serviceCounter qps:%v,dayCount:%v", serviceCounter.QPS, dayServiceCount)
c.Next()
中间件代码解释 | 一次流量统计功能完成的流程:
本段代码中,由public.FlowCounterHandler.GetCounter方法获取服务的流量统计,返回目标服务的流量统计器。从目标服务的流量统计器中,即可获取到从Redis中读取到的请求次数
同时,调用totalCounter.Increase()方法,原子增加流量统计器中的TickerCount值,由0增到1,
此时,在redis_flow_count.go的NewRedisFlowCountService方法的无限for循环中,就会在提取出tickerCount的值后,将其重新置为0。接着,将提取出的tickerCount通过INCRBY命令语句,写入Redis中,接着flush刷新,该数据便写入Redis中了。完成一次流量统计。
补充:SETNX分布式锁
写入redis,试用SETNX分布式锁实现实现写入,可以避免读取和修改数据造成结果混乱
使用场景:
通常在分布式系统中,我们经常会从数据库中读取数据和修改数据,然而这不是一个原子操作,在并发时就会导致数据的不正确,例如一会下面的电商秒杀,库存数量的更新就会遇到。
redis是单线程的,单纯的使用同步锁只能保证单体系统下正常运行,但是在微服务架构下没法保证,所以要使用setnx分布式锁实现写入
基本语法:
Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。
redis> EXISTS job # job 不存在
(integer) 0
redis> SETNX job "programmer" # job 设置成功
(integer) 1
redis> SETNX job "code-farmer" # 尝试覆盖 job ,失败
(integer) 0
redis> GET job # 没有被覆盖
"programmer"
说人话就是:使用setnx命令设置了一个key,之后再次设置覆盖就会报错,除非将这个key删除了,才能重新设置
这样的一个特性就可以用于加锁使得数据同步的功能上
- 使用setnx命令设置key相当于加锁;
- 执行业务代码
- 删除key就相当于解锁
但是单纯这么使用还有缺陷去,一旦中间的业务代码操作出现了异常,就会导致程序无法解锁,而其他请求也会一直无法拿到key,造成程序逻辑死锁
这时可以采取捕获异常的方式解决,保证即使上述逻辑出问题,也能del掉
问题:
- 在执行上锁后有一台服务器出现宕机、断电,导致异常无法抛出,key一直存在,仍会导致死锁
解决办法:
上锁的同时,利用原子性的操作设置key的时长,过期后就抛出异常
- 如果减库存的操作时间很长,超出的锁的过期时长,应该如何操作??????????
SpringCloud服务网关Zuul分析①分发
参考技术A 在SpringCloud中充当服务网关的角色,它包含了鉴权、流量转发、请求统计等等功能Filter是Zuul的核心,用来实现对外服务的控制。Filter的生命周期有4个,分别是 “PRE”、“ROUTING”、“POST”、“ERROR” ,整个生命周期可以用下图来表示。
PRE: 这种过滤器在请求被路由之前调用。我们可利用这种过滤器实现身份验证、在集群中选择请求的微服务、记录调试信息等。
ROUTING: 这种过滤器将请求路由到微服务。这种过滤器用于构建发送给微服务的请求,并使用Apache HttpClient或Netfilx Ribbon请求微服务。
POST: 这种过滤器在路由到微服务以后执行。这种过滤器可用来为响应添加标准的HTTP Header、收集统计信息和指标、将响应从微服务发送给客户端等。
ERROR: 在其他阶段发生错误时执行该过滤器。 除了默认的过滤器类型,Zuul还允许我们创建自定义的过滤器类型。例如,我们可以定制一种STATIC类型的过滤器,直接在Zuul中生成响应,而不将请求转发到后端的微服务。
根据场景需要,我们也可以自定义一些filter,穿插在整个过程的某个阶段,只需要继承ZuulFilter,并且覆盖里面的4个方法就可以了.
application.properties中配置:
# 禁用一些Filter的配置:
zuul.FormBodyWrapperFilter.pre.disable= true
# 路由的配置:
# 配置需要被跳转的地址,/user/**的网址将被分发
zuul.routes.user.path=/user/**
# 重定向的地址:
zuul.routes.user.url=http://127.0.0.1:8081/
以上是关于微服务网关redis流量统计的主要内容,如果未能解决你的问题,请参考以下文章