PBFT代码实现

Posted 小圣.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PBFT代码实现相关的知识,希望对你有一定的参考价值。

本篇文章主要是PBFT共识的简单实现,其中有许多地方都做了简化。PBFT的原理已在上篇文章中描述过,如果对PBFT的原理不太清晰的的可以进行查看。文章地址:共识算法学习总结

代码实现的主要功能有:通过客户端添加区块,使用libp2p的mdns进行节点发现,动态的添加节点。

客户端

在启动客户端时,首先根据端口号创建一个客户端,然后启动客户端。

var clientCmd = &cobra.Command
	Use: "client",
	Short: "client manage",
	Run: func(cmd *cobra.Command, args []string) 
		// 获取客户端的端口
		port, err := cmd.Flags().GetInt("port")
		if err != nil 
			log.Println("get param error: ", err)
		
		// 客户端传递的数据
		data, err := cmd.Flags().GetString("data")
		if err != nil
			log.Println("get param error: ", err)
		
		client := NewClient(port)
		client.Start(data)
	,

创建客户端

创建的客户端为libp2p节点,并设置节点的私钥。这里的加密算法使用的是Ed25519算法,不但效率更高并且可以在别的节点获取当前节点的公钥。

// 创建客户端
func NewClient(listenPort int) *Client 
	// 生成密钥对
	r := rand.Reader
	prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
	if err != nil
		log.Println(err)
	
	pubKey := prvKey.GetPublic()
	sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", listenPort))

	// 创建libp2p节点
	h, err := libp2p.New(
		libp2p.ListenAddrs(sourceMultiAddr),
		libp2p.Identity(prvKey),
	)
	if err != nil 
		 log.Println("创建的客户端节点失败:", err)
	

	h.SetStreamHandler(protocol.ID(protocolID), handleStream)

	fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\\n", "0.0.0.0", listenPort, h.ID().Pretty())

	keyPair := Keypair
		privkey: prvKey,
		pubkey: pubKey,
	

	// 创建客户端
	client := &Client
		h,
		keyPair,
		[]KnownNode,
		sync.Mutex,
		make(map[string]*common.ReplyMsg),
	
	
	fmt.Println(">>> 创建客户端成功...")
	return client

启动客户端

客户端启动时,首先不断获取网络中的节点,然后发送request请求,并等待回应。
这里做了简化,在客户端启动时就直接发送request请求。

func (c *Client) Start (data string)  
	fmt.Println(">>> 开始启动客户端...")
	ctx := context.Background()
	// 通过协程获取网络中的节点,使用libp2p的mdns节点发现
	go c.getAllKonwons(c.client)
	// 发送客户端请求
	c.sendRequest(ctx, data)
	// 处理响应
	go c.handleConnection()

	select 

客户端发送请求

客户端首先创建request消息,消息格式为<REQUEST, o, t, c>。o: 请求的具体操作,t: 请求时客户端追加的时间戳,c:客户端标识。REQUEST: 包含消息内容m,以及消息摘要d(m)。客户端对请求进行签名。

客户端创建完request请求后就可以向主节点发送该请求。

func (c *Client) sendRequest(ctx context.Context, data string)  
	fmt.Println(">>> 客户端准备request消息...")
	// 构建request
	req := common.Request
		data,
		hex.EncodeToString(common.GenerateDigest(data)),
	

	// 序列化pubKey
	marshalPubkey, err := crypto.MarshalPublicKey(c.keypair.pubkey)
	sendClient := common.SendClient
		c.client.ID().Pretty(),
		marshalPubkey,
	
	// 构建request消息
	reqMsg := common.RequestMsg
		"solve",
		int(time.Now().Unix()),
		sendClient,
		req,
	

	// 对发送的消息进行签名
	sig, err := c.signMessage(reqMsg)
	if err != nil
		fmt.Printf("%v\\n", err)
	

	// 组合并发送消息
	c.send(ctx, common.ComposeMsg(common.HRequest, reqMsg, sig), c.findPrimaryNode())
	fmt.Println(">>> 客户端发送消息完成...")

数据发送

客户端发送数据时,首先连接到主节点,然后打开与主节点的stream。再打开数据发送的通道,最后序列化数据并发数据添加到发送数据的通道中。

func (c *Client) send(ctx context.Context, msg []byte, node KnownNode) 
	// 开始连接到主节点
	if err := c.client.Connect(ctx, node.h); err != nil
		log.Println(">>> 连接到主节点失败")
	

	// 打开stream
	s, err := c.client.NewStream(context.Background(), node.h.ID, protocol.ID(protocolID))
	if err != nil 
		fmt.Println(">>> 打开stream失败", err)
	
	fmt.Println(">>> 开始连接到: ", node.h)

	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

	// 准备发送数据的通道
	go sendData(rw)

	// 序列化数据
	data, err := json.Marshal(msg)
	if err != nil
		fmt.Println("序列化数据错误", err)
	

	sendDataChan <- data
	close(sendDataChan)

服务端命令行

服务端启动启动时,首先创建一个server,然后启动server。

var serverCmd = &cobra.Command
	Use: "server",
	Short: "server manage",
	Run: func(cmd *cobra.Command, args []string) 
		port, err := cmd.Flags().GetInt("port")
		if err != nil 
			log.Println("get param error: ", err)
		
		// 创建server
		server := NewServer(port)
		// 开始server
		server.start()
	,

创建新的节点

创建服务端与创建客户端类似。

func NewNode(port int) *Node 
	// 生成密钥对
	r := rand.Reader
	prvKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r)
	if err != nil
		log.Println(err)
	
	pubKey := prvKey.GetPublic()
	sourceMultiAddr, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%d", "127.0.0.1", port))

	// 创建libp2p节点
	h, err := libp2p.New(
		libp2p.ListenAddrs(sourceMultiAddr),
		libp2p.Identity(prvKey),
	)
	if err != nil 
		log.Println(err)
	

	h.SetStreamHandler(protocol.ID(protocolID), handleStream)
	fmt.Printf(">>> 创建客户端p2p节点成功,客户端多路地址是: /ip4/%s/tcp/%v/p2p/%s\\n", "0.0.0.0", port, h.ID().Pretty())

	keyPair := Keypair
		privkey: prvKey,
		pubkey: pubKey,
	

	// 创建node
	return &Node
		[]KnownNode,
		ClientNode,
		0,
		h,
		ViewID,
		make(chan []byte),
		keyPair,
		&MsgLog
			make(map[string]map[string]bool),
			make(map[string]map[string]bool),
			make(map[string]map[string]bool),
			make(map[string]bool),
		,
		make(map[string]*common.RequestMsg),
		sync.Mutex,
	

启动节点

服务端的启动仅仅开启一个消息处理协程。 通过消息处理协程,会把服务端接收到的消息分配到对应的逻辑中进行处理。

func (node *Node)Start()  
	// 处理消息
	go node.handleMsg()


func (node *Node) handleMsg() 
	fmt.Println(">>> 启动节点,等待接收消息...")
	for  

		//  待改进 todo
		rawData := <- receiveChan

		// 反序列得到的数据
		var data []byte
		err := json.Unmarshal([]byte(rawData), &data)
		if err != nil 
			fmt.Println("反序列消息化失败:", err)
			return
		

		// 分割消息,分别处理不同的消息
		header, payload, sign:= common.SplitMsg(data)
		switch header 
		case common.HRequest:
			node.handleRequest(payload, sign)
		case common.HPrePrepare:
			node.handlePrePrepare(payload, sign)
		case common.HPrepare:
			node.handlePrepare(payload, sign)
		case common.HCommit:
			node.handleCommit(payload, sign)
		default:
			fmt.Println("===============无法处理对应的消息============")
		
	

request处理

节点接收到消息后,首先反序列化request消息,然后设置客户端。接着校验request消息的摘要及签名。通过验证后放入请求池,接着创建pre-prepare消息并进行签名。最组合消息进行发送。剩下的三个函数类似,这里就不再叙述。

func (node *Node) handleRequest(payload []byte, sig []byte)  
	fmt.Println(">>> 主节点接收request消息...")
	var request common.RequestMsg
	var prePrepareMsg common.PrePrepareMsg

	// 反序列化请求消息
	err := json.Unmarshal(payload, &request)
	if err != nil
		log.Println("反序列化request错误: ", err)
		return
	

	// 设置节点的客户端
	clientPubKey, err := crypto.UnmarshalPublicKey(request.Client.PubKey)
	if err != nil
		fmt.Println(">>> 反序列化客户端公钥失败", err)
	
	clientNode := ClientNode
		request.Client.ID,
		clientPubKey,
	
	node.clientNode = clientNode


	// 校验request的摘要
	vdig := common.VerifyDigest(request.CRequest.Message, request.CRequest.Digest)
	if vdig == false 
		fmt.Printf("验证摘要错误\\n")
		return
	

	// 校验request的签名
	_, err  = common.VerifySignatrue(request, sig, clientPubKey)
	if err != nil  
		fmt.Printf("验证签名错误:%v\\n", err)
		return
	

	// 添加进请求池
	node.mutex.Lock()
	node.requestPool[request.CRequest.Digest] = &request
	seqID := node.getSequenceID()
	node.mutex.Unlock()

	// 构建pre-Prepare消息
	prePrepareMsg = common.PrePrepareMsg
		request,
		request.CRequest.Digest,
		ViewID,
		seqID,
	

	// 消息签名
	msgSig, err:= node.signMessage(prePrepareMsg)
	if err != nil
		fmt.Printf("%v\\n", err)
		return
	

	// 消息组合
	msg := common.ComposeMsg(common.HPrePrepare, prePrepareMsg, msgSig)

	// 日志处理
	node.mutex.Lock()
	if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil 
		node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
	
	node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
	node.mutex.Unlock()

	// 序列化消息
	data, err := json.Marshal(msg)
	if err != nil
		fmt.Println("序列化request消息出错", err)
		return
	

	fmt.Println(">>> 主节点广播prePrepare消息...")
	// 广播消息
	node.broadcast(data)

pre-prepare消息处理

func (node *Node) handlePrePrepare(payload []byte, sig []byte) 
	fmt.Println(">>> 副节点开始接收prePrepare消息...")
	// 反序列化prePrepare消息
	var prePrepareMsg common.PrePrepareMsg
	err := json.Unmarshal(payload,&prePrepareMsg)
	if err != nil 
		fmt.Printf("error happened:%v", err)
		return
	

	// 找到主节点的公钥
	pnodeId := node.findPrimaryNode()
	pubKey, err := pnodeId.h.ID.ExtractPublicKey()
	if err != nil 
		fmt.Println("获取主节点的公钥失败", err)
		return
	

	// 校验消息签名
	_, err = common.VerifySignatrue(prePrepareMsg, sig, pubKey)
	if err != nil  
		fmt.Printf("验证主节点签名错误:%v\\n", err)
		return
	

	// 校验消息的摘要
	if prePrepareMsg.Digest != prePrepareMsg.Request.CRequest.Digest 
		fmt.Printf("校验摘要错误\\n")
		return
	

	node.mutex.Lock()
	node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
	node.mutex.Unlock()

	// 校验request的摘要
	err = node.verifyRequestDigest(prePrepareMsg.Digest)
	if err != nil
		fmt.Printf("%v\\n", err)
		return
	

	node.mutex.Lock()
	node.requestPool[prePrepareMsg.Request.CRequest.Digest] = &prePrepareMsg.Request
	node.mutex.Unlock()

	err = node.verifyRequestDigest(prePrepareMsg.Digest)
	if err != nil
		fmt.Printf("%v\\n", err)
		return
	

	node.mutex.Lock()
	if node.msgLog.preprepareLog[prePrepareMsg.Digest] == nil 
		node.msgLog.preprepareLog[prePrepareMsg.Digest] = make(map[string]bool)
	
	node.msgLog.preprepareLog[prePrepareMsg.Digest][node.node.ID().String()] = true
	node.mutex.Unlock()

	// 构建prePare消息
	prepareMsg := common.PrepareMsg
		prePrepareMsg.Digest,
		ViewID以上是关于PBFT代码实现的主要内容,如果未能解决你的问题,请参考以下文章

PBFT代码实现

PBFT算法java实现

RAFT与PBFT

PBFT && RBFT算法流程

对PBFT算法的理解

对PBFT算法的理解