[Linux]以匹配系统为例入门Thrift框架

Posted 鱼竿钓鱼干

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Linux]以匹配系统为例入门Thrift框架相关的知识,希望对你有一定的参考价值。

[Linux]以匹配系统为例入门Thrift框架

参考博客/文章:
Apache Thrift系列详解(一) - 概述与入门
acwing Linux 基础课(主要参考这里的教程)
RPC是什么,看完你就知道了
thrift官网
c++并行编程速成

什么是Thrift

Thrift是一个轻量级、跨语言的远程服务调用框架。它通过自身的IDL中间语言, 并借助代码生成引擎生成各种主流语言的RPC服务端/客户端模板代码。
RPC(Remote Procedure Call)远程过程调用协议,一种通过网络从远程计算机上请求服务,而不需要了解底层网络技术的协议。

简单来说就是方便你调用远程服务器上的程序

通过游戏匹配案例学习Thrift框架

借助acwing提供的服务器来实现游戏匹配服务

任务列表:

  • 对接口进行描述
  • 实现游戏节点(Python编写,match_client端)
  • 实现匹配系统节点(C++编写,match_server端,save_client端口)

match匹配服务save数据保存服务,就是实现两个服务的客户端和服务端

数据存储中心(save_server端口)acwing已经帮忙实现,使用先前的myserver服务器即可

Step.1 创建相应项目文件

gitlab上创个项目,然后在该项目文件夹下方创建三个文件目录
match_system:匹配系统
game:游戏
thrift:接口描述

Step.2 创建接口描述文件

两个接口文件

match.thrift
为匹配客户端提供add_user接口和remove_user接口

save.thrift
为匹配服务端提供把数据发送到myserver服务器(数据存储服务器)并保存的save_data接口

进入thrift目录
将下面内容输入到match.thrift

namespace cpp match_service

struct User{
    1:i32 id,
    2:string name,
    3:i32 score
}

service Match{
    i32 add_user(1:User user,2:string info),
    
    i32 remove_user(1:User user,2:string info),
}

第一行namespace 说明使用的语言,我们选择c++,这样thrift文件的语法就基本和c++一样了,有几个不同的点是int要写成i32,变量参数之前要有标号
我们写个结构体User并作为参数描述两个接口add_userremove_user

将下面内容输入到save.thrift

namespace cpp save_service

service Save{
    #username:myserver名称 用于数据保存的服务器
    #password:myserver的密码的md5sum前8
    #验证成功返回1,否则返回0
    i32 save_data(1:string username,2:string password,3:i32 player1_id,4:i32 player2_id)
}

username和password用于验证
使用md5sum加工服务器密码,便于代码公开和保证安全

Step.3 创建匹配客户端文件

我们使用python编写游戏匹配客户端的add_userremove_user功能

任务:

  1. 我们要根据输入判断是要add还是remove,还要传达用户相关参数。
  2. 我们要借助框架让客户端调用服务端的程序

使用下面的命令在当前文件夹下生成框架

thrift -r --gen py tutorial.thrift

tutorial.thrift要换成自己的match.thrift位置

生成了gen-py后重命名然后找到客户端的py文件俺需求修改(头文件和相关功能)
因为我们是写客户端,所以要注意删除服务端相关的文件(python不删可以,下面的c++不行,具体看thrift官网来区分)

client.py文件例子

from match_client.match import Match
from match_client.match.ttypes import User
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from sys import stdin

def operate(op,user_id,username,score):
    # Make socket
    transport = TSocket.TSocket('localhost', 9090)

    # Buffering is critical. Raw sockets are very slow
    transport = TTransport.TBufferedTransport(transport)

    # Wrap in a protocol
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    # Create a client to use the protocol encoder
    client = Match.Client(protocol)

    # Connect!
    transport.open()
    user=User(user_id,username,score)
    if op=="add":
        client.add_user(user,"")
    elif op=="remove":
        client.remove_user(user,"")
    # Close!
    transport.close()

def main():
    for line in stdin:
        op,user_id,username,score=line.split(" ")
        operate(op,int(user_id),username,int(score))

if __name__=="__main__":
    main()

Step.4 创建匹配服务端文件

我们使用c++编写游戏匹配服务端

任务:

  1. 处理add_userremove_user调用
  2. 为玩家进行匹配

Part.1 框架生成

同理使用类似的操作生成thrift的cpp框架,并修改其中的内容(头文件和功能实现)

thrift -r --gen cpp tutorial.thrift

Part.2 处理接口调用和匹配

我们发现处理需要同时处理两个人物:接口调用和玩家匹配,可以考虑到这里有个并行的需求。
接下来需要了解几个概念

匹配池

我们创建一个Pool类,用来描述一个匹配池
把用户作为数学,定义save_result,add_user,remove_user,match等方法

生产者-消费者

并行设计模式:生产者-消费者
我们把两个任务分别看作生产者和消费者
生产者:add_userremove_user的接口调用
消费者:匹配工作

我们将接口调用加入到一个消息队列( Message_Queue )当中
并根据消息队列的情况进行匹配

并行编程

由于两个任务同时进行,因此需要采用并行编程的方式。
通过调用thread相关库实现多线程,然后使用mutex(锁)来解决并行编程中的资源争夺问题。
由于并行编程本人还不是很了解,所以这里不过多谈及

Part.3 匹配方式的完善

我们最开始写了个最原始的匹配方式:匹配池超过两名玩家就匹配。然而我们平常玩的游戏的匹配机制没有那么容易。我们可以考虑添加玩家等待时间的参数,根据等待时间改变匹配范围,实现更加真实的匹配。

Part.4 单线程框架To多线程框架

指令生成的初始框架为单线程框架,我们参照官网的案例将其修改为多线程框架,一次加速匹配服务。

下面是匹配服务端的成品代码

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TSocket.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/TToString.h>


#include<iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>
#include<vector>
#include<unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;
using namespace  ::save_service;
using namespace std;

struct Task{
    User user;
    string type;
};

struct MessageQueue{
    queue<Task> q;
    mutex m;
    condition_variable cv;
}message_queue;

class Pool{
    public:
        void save_result(int a,int b){
            printf("Match Result:%d vs %d !!!\\n",a,b);
            std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
            std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
            std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
            SaveClient client(protocol);

            try {
                transport->open();

                client.save_data("acs_152","1c915010",a,b);

                transport->close();
            } catch (TException& tx) {
                cout << "ERROR: " << tx.what() << endl;
            }
        }
        bool check_match(uint32_t i, uint32_t j)
        {
            auto a = users[i], b = users[j];

            int dt = abs(a.score - b.score);
            int a_max_dif = wt[i] * 50;
            int b_max_dif = wt[j] * 50;

            return dt <= a_max_dif && dt <= b_max_dif;
        }

        void match(){
            for(uint32_t i = 0 ; i < wt.size() ; i ++ ){
                wt[i]++;
            }
            while(users.size()>1){
                sort(users.begin(),users.end(),[&](User& a,User b){
                        return a.score<b.score;
                        });
                bool flag=true;
                for(uint32_t i = 0; i < users.size(); i ++ ){
                    for(uint32_t j = i+1; j < users.size() ; j ++ ){
                        if(check_match(i,j)){
                            auto a=users[i],b=users[j];
                            users.erase(users.begin()+j);
                            users.erase(users.begin()+i);

                            wt.erase(wt.begin()+j);
                            wt.erase(wt.begin()+i);

                            save_result(a.id,b.id);
                            flag=false;
                            break;
                        }
                    }
                    if(flag)break;
                }
                if(flag)break;
            }
        }
        void add(User user){
            users.push_back(user);
            wt.push_back(0);
        }
        void remove(User user){
            for (uint32_t i=0;i<users.size();i++)
                if(users[i].id==user.id){
                    users.erase(users.begin()+i);
                    wt.erase(wt.begin()+i);
                    break;
                }
        }

    private:
        vector<User>users;
        vector<int> wt;//等待时间,单位秒
}pool;
class MatchHandler : virtual public MatchIf {
    public:
        MatchHandler() {
            // Your initialization goes here
        }

        int32_t add_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("add_user\\n");
            unique_lock<mutex>lck(message_queue.m);
            message_queue.q.push({user,"add"});
            message_queue.cv.notify_all();
            return 0;
        }

        int32_t remove_user(const User& user, const std::string& info) {
            // Your implementation goes here
            printf("remove_user\\n");

            unique_lock<mutex>lck(message_queue.m);
            message_queue.q.push({user,"remove"});
            message_queue.cv.notify_all();
            return 0;
        }

};

class MatchCloneFactory : virtual public MatchIfFactory {
    public:
        ~MatchCloneFactory() override = default;
        MatchIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override
        {
            std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
            /*cout << "Incoming connection\\n";
            cout << "\\tSocketInfo: "  << sock->getSocketInfo() << "\\n";
            cout << "\\tPeerHost: "    << sock->getPeerHost() << "\\n";
            cout << "\\tPeerAddress: " << sock->getPeerAddress() << "\\n";
            cout << "\\tPeerPort: "    << sock->getPeerPort() << "\\n";*/
            return new MatchHandler;
        }
        void releaseHandler(MatchIf* handler) override {
            delete handler;
        }
};

void consume_task(){
    while(true){
        unique_lock<mutex>lck(message_queue.m);
        if(message_queue.q.empty()){
            //message_queue.cv.wait(lck);
            //continue;
            lck.unlock();
            pool.match();
            sleep(1);
        }
        else{
            auto task = message_queue.q.front();
            message_queue.q.pop();
            lck.unlock();

            //do task
            if(task.type=="add")pool.add(task.user);
            else if(task.type=="remove")pool.remove(task.user);

        }

    }
}

int main(int argc, char **argv) {
    TThreadedServer server(
            std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
            std::make_shared<TServerSocket>(9090), //port
            std::make_shared<TBufferedTransportFactory>(),
            std::make_shared<TBinaryProtocolFactory>()
    );


    cout<<"Start Match Server"<<endl;

    thread matching_thread(consume_task);

    server.serve();
    return 0;
}

Step.5 创建数据保存客户端文件

由于数据存储服务端acwing已经帮忙实现了,所以只需要根据需要生成客户端即可
match_system/src目录下创建save_client

thrift -r --gen cpp tutorial.thrift

这里tutorial.thrift要写thrift文件夹下的save.thrift位置
同时删除服务端的cpp文件(因为c++不能同时两个main,python不删可能没事)

测试一下,匹配结果数据能不能保存到myserver服务器上即可。当然在这之前你要完成匹配服务端文件中对save_result方法的编写

收获

该项目的文件放在gitlab上,各位感兴趣可以看看,项目地址
通过本次学习,增加了不少知识点,后续会写一些博客对其进行总结

  • 以thrift为例的RPC框架
  • c++ 并行编程
  • 线程进程的基本概念
  • 熟练了git的操作以及相关规范
  • c++ 条件变量,类的基本使用
  • 锁的概念

以上是关于[Linux]以匹配系统为例入门Thrift框架的主要内容,如果未能解决你的问题,请参考以下文章

Thrift入门 | Thrift框架分析(源码角度)

thrift入门

小白入门Thrift RPC框架

[linux环境] 基于thrift模拟游戏的简易匹配机制

idea远程调试

[linux环境] 基于thrift模拟游戏的简易匹配机制