Go实现简单负载均衡
Posted 360技术
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Go实现简单负载均衡相关的知识,希望对你有一定的参考价值。
奇技指南
今天小编为大家分享一篇关于Go实现简单的负载均衡器的文章,只是对负载均衡进行了基础的功能实现,有助于对负载均衡的理解。如果有兴趣,也可以以此为基础进行功能扩展,希望能对大家有所帮助。
本文转载自360云计算
在使用过像 nginx 之类的专业的负载均衡之后,为了加深对负载均衡的原理理解,本次我们使用 Golang 也来实现一个简单的负载均衡。
工作原理
-
轮询 - 平均的分配负载,假定所有后端服务具有相同的处理能力 -
加权轮询 - 根据后端服务的处理能力,可以赋予相应权重 -
最少连接 - 负载分配到活跃连接最少的服务器上
轮询在实现方面非常简单,它以均等的机会让后端服务轮流执行请求任务。
如图所示,请求周期性的轮流转发给后端服务。但是我们不能直接这样简单来实现,需要考虑其他因素。
如果后端服务宕机了怎么办?我们肯定不想把流量转发到这台挂掉的节点上。因此,除非有附加条件,否则不能直接轮流转发负载。我们需要将流量仅路由到已启动并正常运行的后端服务节点上。
2
数据结构
type Backend struct {
URL *url.URL
Alive bool
mux sync.RWMutex
ReverseProxy *httputil.ReverseProxy
}
type ServerPool struct {
backends []*Backend
current uint64
}
3
ReverseProxy
u, _ := url.Parse("http://localhost:8080")
rp := httputil.NewSingleHostReverseProxy(u)
// initialize your server and add this as handler
http.HandlerFunc(rp.ServeHTTP)
使用 httputil.NewSingleHostReverseProxy(url) 我们可以初始化一个反向代理,它将请求转发给传入的 url。在上面的示例中,所有请求都将转发到 localhost:8080 上,并将结果返回给原始请求端。
对于我们简单的负载均衡实现,我们可以使用后端服务的 URL 来初始化 ReverseProxy,以便 ReverseProxy 将请求发送到URL。
4
选择过程
为了满足该要求,理想的解决方案是原子的增加。可以通过 Go 的原子包来实现。
func (s *ServerPool) NextIndex() int {
return int(atomic.AddUint64(&s.current, uint64(1)) % uint64(len(s.backends)))
}
5
取回一个存活后端
如上图所示,我们想从整个列表的下一个元素开始遍历,这可以简单地通过遍历 next + length 来完成。但是要选择一个索引,我们希望将其限制在切片长度之间。 通过修改可以轻松完成此操作。
// GetNextPeer returns next active peer to take a connection
func (s *ServerPool) GetNextPeer() *Backend {
// loop entire backends to find out an Alive backend
next := s.NextIndex()
l := len(s.backends) + next // start from next and move a full cycle
for i := next; i < l; i++ {
idx := i % len(s.backends) // take an index by modding with length
// if we have an alive backend, use it and store if its not the original one
if s.backends[idx].IsAlive() {
if i != next {
atomic.StoreUint64(&s.current, uint64(idx)) // mark the current one
}
return s.backends[idx]
}
}
return nil
}
6
处理竞争
// SetAlive for this backend
func (b *Backend) SetAlive(alive bool) {
b.mux.Lock()
b.Alive = alive
b.mux.Unlock()
}
// IsAlive returns true when backend is alive
func (b *Backend) IsAlive() (alive bool) {
b.mux.RLock()
alive = b.Alive
b.mux.RUnlock()
return
}
7
请求处理
// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
peer := serverPool.GetNextPeer()
if peer != nil {
peer.ReverseProxy.ServeHTTP(w, r)
return
}
http.Error(w, "Service not available", http.StatusServiceUnavailable)
}
方法可以简单地作为 HandlerFunc 传递给 http 服务器。
server := http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: http.HandlerFunc(lb),
}
8
只路由到健康节点
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
log.Printf("[%s] %s ", serverUrl.Host, e.Error())
retries := GetRetryFromContext(request)
if retries < 3 {
select {
case <-time.After(10 * time.Millisecond):
ctx := context.WithValue(request.Context(), Retry, retries+1)
proxy.ServeHTTP(writer, request.WithContext(ctx))
}
return
}
// after 3 retries, mark this backend as down
serverPool.MarkBackendStatus(serverUrl, false)
// if the same request routing for few attempts with different backends, increase the count
attempts := GetAttemptsFromContext(request)
log.Printf("%s(%s) Attempting retry %d ", request.RemoteAddr, request.URL.Path, attempts)
ctx := context.WithValue(request.Context(), Attempts, attempts+1)
lb(writer, request.WithContext(ctx))
}
我们利用闭包设计错误处理程序。它允许我们将外部变量捕获到方法中。它将检查现有的重试计数,如果小于3,我们将再次向相同的后端发送相同的请求。
// lb load balances the incoming request
func lb(w http.ResponseWriter, r *http.Request) {
attempts := GetAttemptsFromContext(r)
if attempts > 3 {
log.Printf("%s(%s) Max attempts reached, terminating ", r.RemoteAddr, r.URL.Path)
http.Error(w, "Service not available", http.StatusServiceUnavailable)
return
}
peer := serverPool.GetNextPeer()
if peer != nil {
peer.ReverseProxy.ServeHTTP(w, r)
return
}
http.Error(w, "Service not available", http.StatusServiceUnavailable)
}
使用 context
const (
Attempts int = iota
Retry
)
然后,我们可以像使用 HashMap 一样来检索值。
// GetAttemptsFromContext returns the attempts for request
func GetRetryFromContext(r *http.Request) int {
if retry, ok := r.Context().Value(Retry).(int); ok {
return retry
}
return 0
}
9
被动健康检查
// isAlive checks whether a backend is Alive by establishing a TCP connection
func isBackendAlive(u *url.URL) bool {
timeout := 2 * time.Second
conn, err := net.DialTimeout("tcp", u.Host, timeout)
if err != nil {
log.Println("Site unreachable, error: ", err)
return false
}
_ = conn.Close() // close it, we dont need to maintain this connection
return true
}
现在,我们可以递归检查节点并标记状态。
// HealthCheck pings the backends and update the status
func (s *ServerPool) HealthCheck() {
for _, b := range s.backends {
status := "up"
alive := isBackendAlive(b.URL)
b.SetAlive(alive)
if !alive {
status = "down"
}
log.Printf("%s [%s] ", b.URL, status)
}
}
定期执行,可以在 Go 中启动一个计时器,然后使用通道来监听事件。
// healthCheck runs a routine for check status of the backends every 20 secs
func healthCheck() {
t := time.NewTicker(time.Second * 20)
for {
select {
case <-t.C:
log.Println("Starting health check...")
serverPool.HealthCheck()
log.Println("Health check completed")
}
}
}
最后,我们单独启一个 goroutine 来运行它。
go healthCheck()
总结
-
使用堆来整理存活的后端节点可以减少搜索范围 -
收集统计数据 -
实现加权轮询/最少连接 -
添加配置文件的支持 -
...
如果对负载均衡有兴趣,可以做相应的功能扩展开发。
界世的你当不
只做你的肩膀
无
技术干货|一手资讯|精彩活动
空·
以上是关于Go实现简单负载均衡的主要内容,如果未能解决你的问题,请参考以下文章