PBFT代码实现
Posted 小圣.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PBFT代码实现相关的知识,希望对你有一定的参考价值。
本篇文章主要是PBFT共识的简单实现,其中有许多地方都做了简化。PBFT的原理已在上篇文章中描述过,如果对PBFT的原理不太清晰的的可以进行查看。文章地址:共识算法学习总结。
代码实现的主要功能有:通过客户端添加区块,使用libp2p的mdns进行节点发现,动态的添加节点。
客户端
在启动客户端时,首先根据端口号创建一个客户端,然后启动客户端。
var clientCmd = &cobra.Command
Use: "client",
Short: "client manage",
Run: func(cmd *cobra.Command, args []string)
// 获取客户端的端口
port, err := cmd.Flags().GetInt("port")
if err != nil
log.Println("get param error: ", err)
// 客户端传递的数据
data, err := cmd.Flags().GetString("data")
if err != nil
log.Println("get param error: ", err)
client := NewClient(port)
client.Start(data)
,
创建客户端
创建的客户端为libp2p节点,并设置节点的私钥。这里的加密算法使用的是Ed25519算法,不但效率更高并且可以在别的节点获取当前节点的公钥。
// 创建客户端
func NewClient(listenPort int) *Client
// 生成密钥对
r := rand.Reader
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil
log.Println(err)
pubKey := prvKey.GetPublic()
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", listenPort))
// 创建libp2p节点
h, err := libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
if err != nil
log.Println("创建的客户端节点失败:", err)
h.SetStreamHandler(protocol.ID(protocolID), handleStream)
fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\\n", "0.0.0.0", listenPort, h.ID().Pretty())
keyPair := Keypair
privkey: prvKey,
pubkey: pubKey,
// 创建客户端
client := &Client
h,
keyPair,
[]KnownNode,
sync.Mutex,
make(map[string]*common.ReplyMsg),
fmt.Println(">>> 创建客户端成功...")
return client
启动客户端
客户端启动时,首先不断获取网络中的节点,然后发送request请求,并等待回应。
这里做了简化,在客户端启动时就直接发送request请求。
func (c *Client) Start (data string)
fmt.Println(">>> 开始启动客户端...")
ctx := context.Background()
// 通过协程获取网络中的节点,使用libp2p的mdns节点发现
go c.getAllKonwons(c.client)
// 发送客户端请求
c.sendRequest(ctx, data)
// 处理响应
go c.handleConnection()
select
客户端发送请求
客户端首先创建request消息,消息格式为<REQUEST, o, t, c>。o: 请求的具体操作,t: 请求时客户端追加的时间戳,c:客户端标识。REQUEST: 包含消息内容m,以及消息摘要d(m)。客户端对请求进行签名。
客户端创建完request请求后就可以向主节点发送该请求。
func (c *Client) sendRequest(ctx context.Context, data string)
fmt.Println(">>> 客户端准备request消息...")
// 构建request
req := common.Request
data,
hex.EncodeToString(common.GenerateDigest(data)),
// 序列化pubKey
marshalPubkey, err := crypto.MarshalPublicKey(c.keypair.pubkey)
sendClient := common.SendClient
c.client.ID().Pretty(),
marshalPubkey,
// 构建request消息
reqMsg := common.RequestMsg
"solve",
int(time.Now().Unix()),
sendClient,
req,
// 对发送的消息进行签名
sig, err := c.signMessage(reqMsg)
if err != nil
fmt.Printf("%v\\n", err)
// 组合并发送消息
c.send(ctx, common.ComposeMsg(common.HRequest, reqMsg, sig), c.findPrimaryNode())
fmt.Println(">>> 客户端发送消息完成...")
数据发送
客户端发送数据时,首先连接到主节点,然后打开与主节点的stream。再打开数据发送的通道,最后序列化数据并发数据添加到发送数据的通道中。
func (c *Client) send(ctx context.Context, msg []byte, node KnownNode)
// 开始连接到主节点
if err := c.client.Connect(ctx, node.h); err != nil
log.Println(">>> 连接到主节点失败")
// 打开stream
s, err := c.client.NewStream(context.Background(), node.h.ID, protocol.ID(protocolID))
if err != nil
fmt.Println(">>> 打开stream失败", err)
fmt.Println(">>> 开始连接到: ", node.h)
rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
// 准备发送数据的通道
go sendData(rw)
// 序列化数据
data, err := json.Marshal(msg)
if err != nil
fmt.Println("序列化数据错误", err)
sendDataChan <- data
close(sendDataChan)
服务端命令行
服务端启动启动时,首先创建一个server,然后启动server。
var serverCmd = &cobra.Command
Use: "server",
Short: "server manage",
Run: func(cmd *cobra.Command, args []string)
port, err := cmd.Flags().GetInt("port")
if err != nil
log.Println("get param error: ", err)
// 创建server
server := NewServer(port)
// 开始server
server.start()
,
创建新的节点
创建服务端与创建客户端类似。
func NewNode(port int) *Node
// 生成密钥对
r := rand.Reader
prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
if err != nil
log.Println(err)
pubKey := prvKey.GetPublic()
sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", port))
// 创建libp2p节点
h, err := libp2p.New(
libp2p.ListenAddrs(sourceMultiAddr),
libp2p.Identity(prvKey),
)
if err != nil
log.Println(err)
h.SetStreamHandler(protocol.ID(protocolID), handleStream)
fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\\n", "0.0.0.0", port, h.ID().Pretty())
keyPair := Keypair
privkey: prvKey,
pubkey: pubKey,
// 创建node
return &Node
[]KnownNode,
ClientNode,
0,
h,
ViewID,
make(chan []byte),
keyPair,
&MsgLog
make(map[string]map[string]bool),
make(map[string]map[string]bool),
make(map[string]map[string]bool),
make(map[string]bool),
,
make(map[string]*common.RequestMsg),
sync.Mutex,
启动节点
服务端的启动仅仅开启一个消息处理协程。 通过消息处理协程,会把服务端接收到的消息分配到对应的逻辑中进行处理。
func (node *Node)Start()
// 处理消息
go node.handleMsg()
func (node *Node) handleMsg()
fmt.Println(">>> 启动节点,等待接收消息...")
for
// 待改进 todo
rawData := <- receiveChan
// 反序列得到的数据
var data []byte
err := json.Unmarshal([]byte(rawData), &data)
if err != nil
fmt.Println("反序列消息化失败:", err)
return
// 分割消息,分别处理不同的消息
header, payload, sign:= common.SplitMsg(data)
switch header
case common.HRequest:
node.handleRequest(payload, sign)
case common.HPrePrepare:
node.handlePrePrepare(payload, sign)
case common.HPrepare:
node.handlePrepare(payload, sign)
case common.HCommit:
node.handleCommit(payload, sign)
default:
fmt.Println("===============无法处理对应的消息============")
request处理
节点接收到消息后,首先反序列化request消息,然后设置客户端。接着校验request消息的摘要及签名。通过验证后放入请求池,接着创建pre-prepare消息并进行签名。最组合消息进行发送。剩下的三个函数类似,这里就不再叙述。
func (node *Node) handleRequest(payload []byte, sig []byte)
fmt.Println(">>> 主节点接收request消息...")
var request common.RequestMsg
var prePrepareMsg common.PrePrepareMsg
// 反序列化请求消息
err := json.Unmarshal(payload, &request)
if err != nil
log.Println("反序列化request错误: ", err)
return
// 设置节点的客户端
clientPubKey, err := crypto.UnmarshalPublicKey(request.Client.PubKey)
if err != nil
fmt.Println(">>> 反序列化客户端公钥失败", err)
clientNode := ClientNode
request.Client.ID,
clientPubKey,
node.clientNode = clientNode
// 校验request的摘要
vdig := common.VerifyDigest(request.CRequest.Message, request.CRequest.Digest)
if vdig == false
fmt.Printf("验证摘要错误\\n")
return
// 校验request的签名
_, err = common.VerifySignatrue(request, sig, clientPubKey)
if err != nil
fmt.Printf("验证签名错误:%v\\n", err)
return
// 添加进请求池
node.mutex.Lock()
node.requestPool[request.CRequest.Digest] = &request
seqID := node.getSequenceID()
node.mutex.Unlock()
// 构建pre-Prepare消息
prePrepareMsg = common.PrePrepareMsg
request,
request.CRequest.Digest,
ViewID,
seqID,
// 消息签名
msgSig, err:= node.signMessage(prePrepareMsg)
if err != nil
fmt.Printf("%v\\n", err)
return
// 消息组合
msg := common.ComposeMsg(common.HPrePrepare, prePrepareMsg, msgSig)
// 日志处理
node.mutex.Lock()
if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil
node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
node.mutex.Unlock()
// 序列化消息
data, err := json.Marshal(msg)
if err != nil
fmt.Println("序列化request消息出错", err)
return
fmt.Println(">>> 主节点广播prePrepare消息...")
// 广播消息
node.broadcast(data)
pre-prepare消息处理
func (node *Node) handlePrePrepare(payload []byte, sig []byte)
fmt.Println(">>> 副节点开始接收prePrepare消息...")
// 反序列化prePrepare消息
var prePrepareMsg common.PrePrepareMsg
err := json.Unmarshal(payload,&prePrepareMsg)
if err != nil
fmt.Printf("error happened:%v", err)
return
// 找到主节点的公钥
pnodeId := node.findPrimaryNode()
pubKey, err := pnodeId.h.ID.ExtractPublicKey()
if err != nil
fmt.Println("获取主节点的公钥失败", err)
return
// 校验消息签名
_, err = common.VerifySignatrue(prePrepareMsg, sig, pubKey)
if err != nil
fmt.Printf("验证主节点签名错误:%v\\n", err)
return
// 校验消息的摘要
if prePrepareMsg.Digest != prePrepareMsg.Request.CRequest.Digest
fmt.Printf("校验摘要错误\\n")
return
node.mutex.Lock()
node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
node.mutex.Unlock()
// 校验request的摘要
err = node.verifyRequestDigest(prePrepareMsg.Digest)
if err != nil
fmt.Printf("%v\\n", err)
return
node.mutex.Lock()
node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
node.mutex.Unlock()
err = node.verifyRequestDigest(prePrepareMsg.Digest)
if err != nil
fmt.Printf("%v\\n", err)
return
node.mutex.Lock()
if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil
node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
node.mutex.Unlock()
// 构建prePare消息
prepareMsg := common.PrepareMsg
prePrepareMsg.Digest,
ViewID以上是关于PBFT代码实现的主要内容,如果未能解决你的问题,请参考以下文章