Zookeeper 的 Golang 客户端

Posted thepoy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper 的 Golang 客户端相关的知识,希望对你有一定的参考价值。

使用 docker 创建的三个 Zookeeper 服务端组成的集群,其 ip 地址分别为:

  • 172.17.0.2
  • 172.17.0.3
  • 172.17.0.4

一、增删改查

1 增 / create

创建新节点一共有四种:

  • 持久节点
  • 临时节点
  • 持久时序节点
  • 临时时序节点

代码:

package main

import (
  ...
    "github.com/go-zookeeper/zk"
)

func main() {
    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // 创建持久节点
    path, err := conn.Create("/hello", []byte("world"), 0, zk.WorldACL(zk.PermAll))
    if err != nil {
        panic(err)
    }
    println("Created", path)

    // 创建临时节点,创建此节点的会话结束后立即清除此节点
    ephemeral, err := conn.Create("/ephemeral", []byte("1"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
    if err != nil {
        panic(err)
    }
    println("Ephemeral node created:", ephemeral)

    // 创建持久时序节点
    sequence, err := conn.Create("/sequence", []byte("1"), zk.FlagSequence, zk.WorldACL(zk.PermAll))
    if err != nil {
        panic(err)
    }
    println("Sequence node created:", sequence)

    // 创建临时时序节点,创建此节点的会话结束后立即清除此节点
    ephemeralSequence, err := conn.Create("/ephemeralSequence", []byte("1"), zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll))
    if err != nil {
        panic(err)
    }
    println("Ephemeral-Sequence node created:", ephemeralSequence)
}

2 查 / get

增完了节点,接下来当然是查看一下节点信息:

package main

import (
    ...
  "github.com/go-zookeeper/zk"
)

func main() {
    ...
    result, state, err := conn.Get("/hello")
    if err != nil {
        panic(err)
    }
    fmt.Println("result: ", string(result))
    fmt.Println("state ->")
    fmt.Printf("cZxid=%d\\nctime=%d\\nmZxid=%d\\nmtime=%d\\npZxid=%d\\ncversion=%d\\ndataVersion=%d\\naclVersion=%d\\nephemeralOwner=%v\\ndataLength=%d\\nnumChildren=%d\\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)
}

结果:

result:  world
state ->
cZxid=4294967345
ctime=1618569545037
mZxid=4294967345
mtime=1618569545037
pZxid=4294967345
cversion=0
dataVersion=0
aclVersion=0
ephemeralOwner=0
dataLength=5
numChildren=0

3 改 / set

代码:

package main

import (
    ...
    "github.com/go-zookeeper/zk"
)

func main() {
    ...    
    path := "/hello"
    _, state, _ := conn.Get(path)

    state, err = conn.Set(path, []byte("girl"), state.Version)
    if err != nil {
        panic(err)
    }
    fmt.Println("state ->")
    fmt.Printf("cZxid=%d\\nctime=%d\\nmZxid=%d\\nmtime=%d\\npZxid=%d\\ncversion=%d\\ndataVersion=%d\\naclVersion=%d\\nephemeralOwner=%v\\ndataLength=%d\\nnumChildren=%d\\n", state.Czxid, state.Ctime, state.Mzxid, state.Mtime, state.Pzxid, state.Cversion, state.Version, state.Aversion, state.EphemeralOwner, state.DataLength, state.NumChildren)
  
    data, _, err := conn.Get(path)
    if err != nil {
        panic(err)
    }
    fmt.Println("\\nnew value: ", string(data))
}

结果:

state ->
cZxid=4294967345
ctime=1618569545037
mZxid=4294967372
mtime=1618575729297
pZxid=4294967345
cversion=0
dataVersion=1
aclVersion=0
ephemeralOwner=0
dataLength=4
numChildren=0

new value:  girl

4 删 / delete

代码:

package main

import (
    ...
    "github.com/go-zookeeper/zk"
)

func main() {
    ...
    path := "/hello"
    exists, state, err := conn.Exists(path)
    fmt.Printf("\\npath[%s] exists: %v\\n", path, exists)

    err = conn.Delete(path, state.Version)
    if err != nil {
        panic(err)
    }
    fmt.Printf("path[%s] is deleted.", path)

    exists, _, err = conn.Exists(path)
    fmt.Printf("\\npath[%s] exists: %v\\n", path, exists)
}

结果:

path[/hello] exists: true
path[/hello] is deleted.
path[/hello] exists: false

二、权限 / ACL

zk 的节点有 5 种权限:CREATE、READ、WRITE、DELETE 和 ADMIN。

ACL 权限由 scheme:id:permissions 组成。

scheme有 4 种方式:

  • world
  • auth
  • digest
  • ip

下面对这 4 种方式都测试一遍。

1 world

默认方式,相当于全世界都能访问。

/test节点的权限修改为 crwa 后尝试删除其子节点 /1

func main() {
    ...
    // get acl
    acl, state, err := conn.GetACL("/test")
    if err != nil {
        panic(err)
    }
    fmt.Println("\\nget acl:")
    fmt.Println("scheme =", acl[0].Scheme)
    fmt.Println("id =", acl[0].ID)
    fmt.Println("permissions =", acl[0].Perms)

    // set acl
    perms := zk.PermCreate | zk.PermRead | zk.PermWrite | zk.PermAdmin // crwa 权限
    state, err = conn.SetACL("/test", zk.WorldACL(int32(perms)), state.Version)
    if err != nil {
        panic(err)
    }
    fmt.Println("SetAcl successful.")

    // create child node
    _, err = conn.Create("/test/1", []byte("1"), 0, zk.WorldACL(zk.PermAll))
    if err != nil {
        panic(err)
    }

    // get child node
    _, state, err = conn.Get("/test/1")
    if err != nil {
        panic(err)
    }

    // delete child node /1
    err = conn.Delete("/test/1", state.Version)
    if err != nil {
        fmt.Println("delete failed: ", err.Error())
        os.Exit(1)
    }
}

结果:

get acl:
scheme = world
id = anyone
permissions = 31
SetAcl successful.
delete failed:  zk: not authenticated
exit status 1

可以看见,因为权限问题无法删除子节点/1,即使子节点/1赋予的是全部权限。

第 1 小节除了测试 world 外,还对 ACL 权限的设置方法进行了初步的探索。

2 auth

auth 用来授予用户权限,所以需要先创建用户。

为不存在的用户授权

func main() {
    ...
    _, state, err := conn.Get("/test")
    if err != nil {
        panic(err)
    }

    acl := zk.ACL{
        Perms:  31, // cdrwa
        Scheme: "auth",
        ID:     "user:123456", // 不存在的用户
    }

    // 为不存在的用户授权
    _, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)
    if err != nil {
        panic(err)
    }
}

$ go run main.go
panic: zk: invalid ACL specified

节点对指定用户授权

func main() {
    ...
    _, state, err := conn.Get("/test")
    if err != nil {
        panic(err)
    }

    // 用户授权,用户不存在的话会新建
    err = conn.AddAuth("digest", []byte("user1:123456"))
    if err != nil {
        panic(err)
    }
    
    acl := zk.ACL{
        Perms:  31, // cdrwa,也可以用zk权限位或计算
        Scheme: "auth",
        ID:     "user1:123456", // 用户名和密码,密码明文或密文皆可
    }
    
    // 为用户授权
    _, err = conn.SetACL("/test", []zk.ACL{acl}, state.Version)
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/test]已对用户 user1 授权")
}

$ go run main.go
节点[/test]已对用户 user1 授权

授权完成后,需要验证用户认证信息才能进行下一步操作。

使用未授权用户(如 world)访问

func main() {
    data, _, err := conn.Get("/test")
    if err != nil {
        panic(err)
    }
    fmt.Println(string(data))
}

$ go run main.go
panic: zk: not authenticated

如果使用不正确的用户名和密码,得到的会是同样的用户认证失败的结果。

使用指定用户访问

func main() {
    // 每个会话都要验证授权
    err = conn.AddAuth("digest", []byte("user1:123456"))
    if err != nil {
        panic(err)
    }
    
    // 查询节点内容
    data, _, err := conn.Get("/test")
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/test] 存储的内容:", string(data))

    // 查询节点 acl
    acl, _, err := conn.GetACL("/test")
    if err != nil {
        panic(err)
    }
    fmt.Println("acl 信息:")
    fmt.Println("scheme =", acl[0].Scheme)
    fmt.Println("id =", acl[0].ID)
    fmt.Println("permissions =", acl[0].Perms)
}
}

$ go run main.go
节点[/test] 存储的内容: test
acl 信息:
scheme = digest
id = user1:HYGa7IZRm2PUBFiFFu8xY2pPP/s=
permissions = 31

可以看到,在查询节点的 acl 信息时返回的用户密码是加密过的密文。

3 digest

digestauth基本相同,唯一的区别在于设置权限时,密码需要使用密文。

zk golang 库中有专为digest构造的方法:

zk.DigestACL(perms int32, user, password string)

此方法传入的密码需要是明文,其内部逻辑会将明文转为密文再向 zookeeper 传递。

使用示例:

conn.SetACL("/test", zk.DigestACL(31, "user1", "123456"), 0)

4 ip

ip 权限顾名思义,就是限制 ip 地址的访问权限。

把节点的权限设置给指定的 ip 地址后,其他 ip 将无法访问该节点。

设置指定 ip

func main() {
    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    _, err = conn.SetACL("/ip", ipACL(31, "172.17.0.1"), 0)
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/ip] 已设置 ip 权限")
}

其他 ip 访问此节点

新建了一个 docker 容器,ip 地址为 172.17.0.5,访问此节点时:

    data, _, err := conn.Get("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println(string(data))
panic: zk: not authenticated

报认证错误。

使用指定 ip 访问此节点

    data, _, err := conn.Get("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println("节点[/ip] 存储的内容:", string(data))

    acl, _, err := conn.GetACL("/ip")
    if err != nil {
        panic(err)
    }
    fmt.Println("\\nacl 信息:")
    fmt.Println("scheme =", acl[0].Scheme)
    fmt.Println("id =", acl[0].ID)
    fmt.Println("permissions =", acl[0].Perms)

结果:

节点[/ip] 存储的内容:ip

acl 信息:
scheme = ip
id = 172.17.0.1
permissions = 31

三、监听 / watch

watch 用来实现发布/订阅功能,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者。

每个 watch 仅有一次触发的机会,一旦触发会立即失效,想要持续监听,就需要一直注册。

go-zookeeper 中的监听机制是通过事件/ Event 完成的。

1 监听一个节点

1.1 全局监听

将监听器放到Connect函数中,如果有监听事件发生,会一直执行监听器的回调函数。

示例代码:

func callback(e zk.Event) {
    fmt.Println("++++++++++++++++++++++++")
    fmt.Println("path:", e.Path)
    fmt.Println("type:", e.Type.String())
    fmt.Println("state:", e.State.String())
    fmt.Println("------------------------")
}

func main() {
    eventCallbackOption := zk.WithEventCallback(callback)

    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second, eventCallbackOption)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // 注册一个 watch
    exists, state, _, err := conn.ExistsW("/watch")
    if err != nil {
        panic(err)
    }

    if !exists {
        // 创建 /watch 时,触发监听事件,watch 失效
        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {
            panic(err)
        }

        // 再注册一个 watch
        _, state, _, err = conn.ExistsW("/watch")
        if err != nil {
            panic(err)
        }
    }

    // 删除 /watch 时,触发监听事件,watch 失效
    err = conn.Delete("/watch", state.Version)
    if err != nil {
        panic(err)
    }
}

结果:

++++++++++++++++++++++++
path: 
type: EventSession
state: StateConnecting
------------------------
++++++++++++++++++++++++
path: 
type: EventSession
state: StateConnected
------------------------
2021/04/18 16:57:11 connected to 172.17.0.2:2181
++++++++++++++++++++++++
path: 
type: EventSession
state: StateHasSession
------------------------
2021/04/18 16:57:11 authenticated: id=72057651018596414, timeout=4000
2021/04/18 16:57:11 re-submitting `0` credentials after reconnect
++++++++++++++++++++++++
path: /watch
type: EventNodeCreated
state: Unknown
------------------------
++++++++++++++++++++++++
path: /watch
type: EventNodeDeleted
state: Unknown
------------------------
2021/04/18 16:57:11 recv loop terminated: EOF
2021/04/18 16:57:11 send loop terminated: <nil>
++++++++++++++++++++++++
path: 
type: EventSession
state: StateDisconnected
------------------------

1.2 只监听部分事件

对于不需要设置全局监听器的场景,需要对事件 channel 进行操作,watch 使用一次就失效。

func main() {

    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    // 注册一个 watch
    exists, _, eventChannel, err := conn.ExistsW("/watch")
    if err != nil {
        panic(err)
    }

    go func() {
        // 从事件 channel 中取出事件
        e := <-eventChannel
        fmt.Println("++++++++++++++++++++++++")
        fmt.Println("path:", e.Path)
        fmt.Println("type:", e.Type.String())
        fmt.Println("state:", e.State.String())
        fmt.Println("------------------------")
    }()

    if !exists {
        // 创建 /watch 时,触发监听事件,watch 失效
        _, err = conn.Create("/watch", []byte("watch"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if err != nil {
            panic(err)
        }
    }
}

2 监听子节点

TODO: 监听子节点时服务端返回的事件类型可能不正确,待完善

四、客户端随机 host

func main() {
    hosts := []string{"172.17.0.2:2181", "172.17.0.3:2181", "172.17.0.4:2181"}
    hostProvider := new(zk.DNSHostProvider)

    err := hostProvider.Init(hosts)
    if err != nil {
        panic(err)
    }

    host, retry := hostProvider.Next() // 获得host
    fmt.Println(host, retry)

    time.Sleep(10 * time.Second) // 做一些事情

    hostProvider.Connected() // 将使用过的 host 放到 host_list 最后
}

zk.Connect函数在执行时,conn内的私有属性hostProvider已经对传的 host 切片做了 conn.hostProvider.Init(srvs) 处理。

但同时,也可以通过WithHostProvider函数替换默认的hostProvider

五、分布式锁

go-zookeeper 添加分布式锁的方法为NewLock(c *Conn, path string, acl []ACL)

锁的结构体为:

type Lock struct {
    c        *Conn
    path     string
    acl      []ACL
    lockPath string
    seq      int
}

这个结构体实现了三个方法:Lock()LockWithData(data []byte)Unlock()

Lock()LockWithData(data []byte)中传入了空参数:

func (l *Lock) Lock() error {
    return l.LockWithData([]byte{})
}

1 原理

zookeeper的分布式锁可以利用每个节点的唯一性来完成,但所有服务监听一个节点对于分布式系统来说完全是资源浪费。

zookeeper可以利用临时时序/临时顺序节点来创建一个有序的临时节点列表来完成分布式锁:

服务 A 创建了节点 a,此时节点 a 的前面没有节点,所以服务 A 可以执行。此时服务 B 创建了节点 b,节点 b 是节点 a 的下一个节点,那么服务 B 只需要监听节点 a 即可。

也就是说,因为临时有序节点列表是有序的,所以每个服务只需要监听自己创建的节点的前一个节点即可。

2 示例代码

下面是 50 个 Gorouine 进行抢锁的示例:

func main() {
    conn, _, err := zk.Connect([]string{"172.17.0.2", "172.17.0.3", "172.17.0.4"}, time.Second)
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    var wg sync.WaitGroup

    for i := 0; i < 50; i++ {
        wg.Add(1)
        go func(n int) {
            defer wg.Done()
            lock := zk.NewLock(conn, "/root/lock", zk.WorldACL(zk.PermAll))
            err = lock.LockWithData([]byte("it is a lock"))
            if err != nil {
                panic(err)
            }
            fmt.Println("第", n, "个 goroutine 获取到了锁")
            time.Sleep(time.Second) // 1 秒后释放锁

            lock.Unlock()
        }(i)
    }

    wg.Wait()
}

结果:

第 49 个 goroutine 获取到了锁
第 12 个 goroutine 获取到了锁
第 13 个 goroutine 获取到了锁
第 11 个 goroutine 获取到了锁
...
第 28 个 goroutine 获取到了锁
第 30 个 goroutine 获取到了锁
第 26 个 goroutine 获取到了锁
第 31 个 goroutine 获取到了锁

以上是关于Zookeeper 的 Golang 客户端的主要内容,如果未能解决你的问题,请参考以下文章

docker 配置 kafka+zookeeper,golang接入示例

Golang之微服务为什么发现不了

[Golang] kafka集群搭建和golang版生产者和消费者

Golang之发送消息至kafka

2022-02-25:k8s安装zookeeper,yaml如何写?找份北京的golang后端工作,35岁,有人收我吗?

Zookeeper客户端java代码操作