thrift开发笔记

Posted 卷王2048

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了thrift开发笔记相关的知识,希望对你有一定的参考价值。

thrift开发笔记(4种由易到难的匹配系统)

准备工作

  1. 创建项目文件夹thrift_project

  2. 业务逻辑图

  3. 游戏节点,创建game文件夹;

    匹配系统节点,创建match_system文件夹;

    thrift相关文件,创建thrift文件夹

thrift简单语法介绍

使用Thrift开发程序,首先要做的事情就是对接口进行描述, 然后再使用Thrift将接口的描述文件编译成对应语言的版本

1.命名空间

thrift文件命名一般都是以.thrift作为后缀:XXX.thrift,可以在该文件的开头为该文件加上命名空间限制,格式为:

namespace 语言名称 名称

例如对c++来说,有:

namespace cpp match_service

2.数据类型

大小写敏感,它共支持以下几种基本的数据类型:

  1. string, 字符串类型,注意是全部小写形式;
  2. i16, 16位整形类型,
  3. i32,32位整形类型,对应C/C++/java中的int类型;
  4. i64,64位整形,对应C/C++/java中的long类型;
  5. byte,8位的字符类型,对应C/C++中的char,java中的byte类型
  6. bool, 布尔类型,对应C/C++中的bool,java中的boolean类型;
  7. double,双精度浮点类型,对应C/C++/java中的double类型;
  8. void,空类型,对应C/C++/java中的void类型;该类型主要用作函数的返回值,

除上述基本类型外,ID还支持以下类型:

  1. map,map类型,例如,定义一个map对象:map<i32, i32> newmap;
  2. set,集合类型,例如,定义set对象:set aSet;
  3. list,链表类型,例如,定义一个list对象:list aList;

struct,自定义结构体类型,在IDL中可以自己定义结构体,对应C中的struct,c++中的struct和class,java中的class。例如:

struct User
      1: i32 id,
      2: string name,
      3: i32 score

注意,在struct定义结构体时需要对每个结构体成员用序号标识:“序号: ”。

3.函数接口

文件中对所有接口函数的描述都放在service中,service的名字可以自己指定,该名字也将被用作生成的特定语言接口文件的名字。

接口函数需要对参数使用序号标号,除最后一个接口函数外,要以,结束对函数的描述。

如:

namespace cpp match_service

struct User 
    1: i32 id,
    2: string name,
    3: i32 score


service Match 

    i32 add_user(1: User user, 2: string info),

    i32 remove_user(1: User user, 2: string info),


服务端的建立

对于匹配系统的thrift相关配置,我们在thrift文件夹下,创建match.thrift文件

打开thrift官网,在上方选择Tutorial项,查看thrift官方教程

点击下方的tutorial.thrift进入一个示例文件

变写thrift配置文件,只需要在文件中写明接口对象.然后执行命令

thrift -r --gen <语言名> <.thrift文件的路径>

就会生成各种配置和连接文件,还有代码框架,只需要在框架中实现自己的业务即可

步骤

1.在thrift文件夹下,编辑match.thrift文件,用来生成匹配系统服务端的一系列文件

match.thrift 文件内容如下:


##c++命名空间
namespace cpp match_service

struct User 
    1: i32 id,
    2: string name,
    3: i32 score


service Match 

    /**
     * user: 添加的用户信息
     * info: 附加信息
     * 在匹配池中添加一个名用户
     */
    i32 add_user(1: User user, 2: string info),

    /**
     * user: 删除的用户信息
     * info: 附加信息
     * 从匹配池中删除一名用户
     */
    i32 remove_user(1: User user, 2: string info),

2.进入到match_system文件夹,创建src文件夹。在src下执行语句:

thrift -r --gen cpp ../../thrift/match.thrift

这样,thrift服务端的一系列文件就会生成在src文件夹中的gen-cpp文件夹下,为了划分业务模块将gen-cpp重命名为match_server

文件结构如下:

.
`-- match_server
    |-- Match.cpp
    |-- Match.h
    |-- Match_server.skeleton.cpp
    |-- match_types.cpp
    `-- match_types.h

其中Match_server.skeleton.cpp: 服务端的代码框架,具体业务就是在这个文件夹下编写实现

Match_server.skeleton.cpp移动到match_system/src下并重命名为main.cppmatch_system的整个业务逻辑就是在这个文件中实现

3.初始main.cpp的改动

  1. 之前main.cppmatch_server下,现在在match_system/src下,所以main.cpp中对Match.h头文件的引入需要修改路径
  2. 文件中的两个函数int32_t add_userint32_t remove_user需要有返回值,原来没有,会报警告,需要手动加上

main.cpp初始文件内容如下:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using namespace  ::match_service;

class MatchHandler : virtual public MatchIf 
 public:
  MatchHandler() 
    // Your initialization goes here
  

  int32_t add_user(const User& user, const std::string& info) 
    // Your implementation goes here
    printf("add_user\\n");


    return 0;
  

  int32_t remove_user(const User& user, const std::string& info) 
    // Your implementation goes here
    printf("remove_user\\n");



    return 0;
  

;

int main(int argc, char **argv) 
  int port = 9090;
  ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
  ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
  ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
  ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
  ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

  TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
  server.serve();
  return 0;

c++文件的编译、链接和运行

C++的编译过程

(注意大小写)假设我有一个main.cpp文件

  1. -E:只对文件进行预处理,不进行编译和汇编。g++ -E main.cpp——>在dos命令行查看某文件的预处理过程,如果你想查看详细的预处理,可以重定向到一个文件中,如:g++ -E main.cpp -o main.i

  2. -s:编译到汇编语言,不进行汇编和链接,即只激活预处理和编译,生成汇编语言,如果你想查看详细的编译,可以重定向到一个文件中,如:g++ -S main.cpp -o main.s

  3. -c:编译到目标代码,g++ -c main.s -o 文件名.o

  4. -o:生成链接文件: 如果该文件是独立的,与其他自己编写的文件无依赖关系。直接g++ main.o -o 生成的可执行文件的文件名

    假设该文件依赖其他源文件(不需要加入头文件)temp.cpp,在对temp.cpp文件进行预处理->编译->汇编后,使用指令g++ temp.o main.o -o main

  5. .\\:执行文件,输出结果。如: .\\main,当然你可以直接g++ main.cpp temp.cpp -o main 生成目标文件让编译器自动为你处理其他流程。

步骤

1.编译src文件夹下的所有.cpp文件

g++ -c *.cpp

2.将所有生成的.o文件链接成一个可执行文件,要用到thrift动态链接库

g++ *.o -o main -lthrift

3.执行生成的可执行文件main

./main

为了判断文件是否正确执行,可以在main.cpp中写一些输出语句,验证效果

4.将项目版本提交git,提交时,一般会删除中间生成的文件和可执行文件

git add .
git restore --stage *.o
git restore --stage match_system/src/main
git commit -m "first can run"

问题:

第一次写的时候 添加一个人会调用2次add_user函数,没有找到问题,全部重做一遍,问题消失

客户端的实现

python文件的执行

python 文件路径

步骤

1.在game下创建src,在src下执行:

thrift -r --gen py ../../thrift/match.thrift

这样,thrift服务端的一系列文件就会生成在src文件夹中的gen-py文件夹下,为了划分业务模块将gen-py重命名为match_client

文件结构如下:

.
|-- __init__.py
`-- match
    |-- Match-remote
    |-- Match.py
    |-- __init__.py
    |-- constants.py
    `-- ttypes.py

因为我们只需要实现客户端,不需要服务端,所以可以把Match-remote删除

2.在src下创建文件client.py,将 Apache Thrift - Python ,页面中,client中的代码复制到该文件中,并将代码进行适当的改动和删除,client.py中的初始代码如下:

from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol


def main():
    # Make socket
    transport = TSocket.TSocket('127.0.0.1', 9090)

    # Buffering is critical. Raw sockets are very slow
    transport = TTransport.TBufferedTransport(transport)

    # Wrap in a protocol
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    # Create a client to use the protocol encoder
    client = Match.Client(protocol)

    # Connect!
    transport.open()
    
    user = User(1,"lyt",50000)
    client.add_user(user,"")

    # Close!
    transport.close()


if __name__ == "__main__":
    main()

3.运行查错

  1. 先在thrift_project/match_system/src下,执行:./main,使服务端运行
  2. 再在thrift_project/game/src下,执行:python3 client.py,使客户端运行
  3. 观察服务端运行处有无相应输出,若有,说明成功运行

我们可以将此版本代码提交git

4.第一个简单输入输出版的client

from match_client.match import Match
from match_client.match.ttypes import User

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from sys import stdin


def operate(op: str, user_id: int, username: str, score: int):
    # Make socket
    transport = TSocket.TSocket('localhost', 9090)

    # Buffering is critical. Raw sockets are very slow
    transport = TTransport.TBufferedTransport(transport)

    # Wrap in a protocol
    protocol = TBinaryProtocol.TBinaryProtocol(transport)

    # Create a client to use the protocol encoder
    client = Match.Client(protocol)

    # Connect!
    transport.open()

    user = User(user_id, username, score)
    
    if op == "add":
        client.add_user(user, "")
    elif op == "remove":
        client.remove_user(user, "")

    # Close!
    transport.close()


def main():
    for line in stdin:
        op, user_id, username, score = line.split(" ")
        operate(op, int(user_id), username, int(score))



if __name__ == "__main__":
    main()

进行运行查错步骤并做正确输入,如果服务端处有相应输出,说明函数调用成功,运行成功

我们可以将此版本代码提交git

服务端具体匹配业务的实现

多线程编程

学习各种语言多线程,需要关注的要点:

  1. 用哪些头文件
  2. 如何创建一个线程(创建线程要用哪些函数)
  3. 如何使用锁(相关的函数)
  4. 如何使用条件变量

c++多线程

参考博客:

C++多线程编程 - kaleidopink - 博客园 (cnblogs.com)

C++多线程编程_Nine days-CSDN博客_c++多线程

java多线程

步骤

1.继续编写thrift_project/match_system/src下的main.cpp

先添加头文件:

#include "mutex"  //锁的头文件
#include "thread"  //线程的头文件
#include "condition_variable"  //条件变量的头文件
#include "queue"

傻瓜式匹配版main.cpp:

// 这个自动生成的框架文件演示了如何构建服务器。
// 你应该把它复制到另一个文件名以避免覆盖它。

#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
#include "mutex"  //锁的头文件
#include "thread"  //线程的头文件
#include "condition_variable"  //条件变量的头文件
#include "queue"
#include "vector"

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace std;
using namespace ::match_service;

struct Task 
    User user;
    string type;
;

struct MessageQueue 
    //队列是互斥的,同时只能有一个线程访问队列
    queue<Task> q;
    mutex m;
    condition_variable cv;
 message_queue;

class Poll 
public:
    void add(User user) 
        users.push_back(user);
    

    void remove(User user) 
        for (uint32_t i = 0; i < users.size(); i++) 
            if (users[i].id == user.id) 
                users.erase(users.begin() + i);
                break;
            
        
    

    void match() 
        while (users.size() > 1) 
//            printf("队列长度为: %ld\\n", users.size());
            auto player1 = users[0];
            auto player2 = users[1];
            users.erase(users.begin());
            users.erase(users.begin());

            save_result(player1.id, player2.id);
        
    

    void save_result(int a, int b) 
        printf(" %d 和 %d 匹配成功\\n", a, b);
    

private:
    vector<User> users;
 pool;


class MatchHandler : virtual public MatchIf 
public:
    MatchHandler() 
        // 在这里初始化
    

    int32_t add_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("add_user\\n");


        unique_lock<mutex> lock1(message_queue.m);//加锁
        message_queue.q.push(user, "add");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();


        return 0;
    

    int32_t remove_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("remove_user\\n");


        unique_lock<mutex> lock1(message_queue.m);//加锁,在队列为空的时候,不能拿到锁
        message_queue.q.push(user, "remove");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();

        return 0;
    

;


//线程操作的函数
void consume_task() 
    while (true) 
        unique_lock<mutex> lock1(message_queue.m);//加锁
        if (message_queue.q.empty()) 
            //因为队列初始一定是空的,如果直接continue,会死循环。因此在初始时,应在有add操作后,才应该执行这里
            //continue
            message_queue.cv.wait(lock1);
         else 
            auto task = message_queue.q.front();
            message_queue.q.pop();
            //因为只有队列是互斥的,为了保证程序的快速运行,操作完队列就应该释放锁
            lock1.unlock();
            //具体任务
            if (task.type == "add") 
                pool.add(task.user);
//                printf("执行了1次add");
             else if (task.type == "remove") 
                pool.remove(task.user);
            
            pool.match();

        
    


int main(int argc, char **argv) 
    int port = 9090;
    ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
    ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    printf("Match server start\\n");

    thread matching_thread(consume_task);


    server.serve();
    return 0;


2.编译链接main.cpp

先编译main.cpp,在链接时,要用到thrift动态链接库线程相关的动态链接库,所以链接时应该执行:

g++ *.o -o main -lthrift -pthread

数据存储客户端的实现

获取一个字符串的MD5加密串

执行命令:

md5sum

回车后输入原字符串。在回车后按ctrl+d,就会生成MD5加密串

步骤:

1.在thrift文件夹下,编辑save.thrift,用来生成数据存储客户端的一系列文件

这里的thrift接口由y总给出,可以在上课代码的git上找到,save.thrift中的代码如下:

namespace cpp save_service

service Save 

    /**
     * username: myserver的名称
     * password: myserver的密码的md5sum的前8位
     * 用户名密码验证成功会返回0,验证失败会返回1
     * 验证成功后,结果会被保存到myserver:homework/lesson_6/result.txt中
     */
    i32 save_data(1: string username, 2: string password, 3: i32 player1_id, 4: i32 player2_id)


2.在match_system/src下执行:

thrift -r --gen cpp ../../thrift/save.thrift

这样,thrift服务端的一系列文件就会生成在src文件夹中的gen-cpp文件夹下,为了划分业务模块将gen-cpp重命名为save_client

注意:

由于c++整个项目只能有一个main函数,而整个服务端的逻辑都在thrift_project/match_system/src下的main.cpp实现。所以一定要删除thrift_project/match_system/src/save_client下的Save_server.skeleton.cpp。而python没有这个问题,所以在用python实现客户端时,主框架文件可删可不删。

3.改动main.cpp将数据存储端的业务写进去

改动点:
  1. 引入头文件,即save_client/Save.h
  2. 补全命名空间,即添加using namespace ::save_service;
  3. class Pool中的save_resut函数中,添加官网 C++样例client中的main函数中的所有代码
  4. 由于数据存储是实现在myserver上,所以在连接时要更改ip地址myserver的ip地址可以执行homework 4 getinfo查看
  5. CalculatorClient改为SaveClient
  6. transport->open()transport->close();之间的教程代码删除,在此之间实现自己的业务

这里给出save_result函数:

    void save_result(int a, int b) 
        printf(" %d 和 %d 匹配成功\\n", a, b);

        std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
        std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        CalculatorClient client(protocol);

        try 
            transport->open();
            //----------在此之间实现自己的业务------------
                //第二个参数是myserver密码的MD5值的前8位
            client.save_data("acs_1642","6a46581f",a,b);

            //-----------------------------------------
            transport->close();
         catch (TException &tx) 
            cout << "ERROR: " << tx.what() << endl;
        

    

4.编译运行,并验证结果

1.编译链接:

g++ -c save_client/*.cpp
g++ -c main.cpp
g++ *.o -o main -lthrift -pthread

2.登录到myserver服务器上查看存储的结果:

ssh myserver
cd homework/lesson_6 
cat result.txt

可以把此版本提交git

匹配系统2.0(按照分差匹配用户)

c++lamda表达式

C++之Lambda表达式 - 季末的天堂 - 博客园 (cnblogs.com)

改动main.cpp:

// 这个自动生成的框架文件演示了如何构建服务器。
// 你应该把它复制到另一个文件名以避免覆盖它。

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <iostream>
#include <mutex>  //锁的头文件
#include <thread>  //线程的头文件
#include <condition_variable>  //条件变量的头文件
#include <queue>
#include <vector>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;

struct Task 
    User user;
    string type;
;

struct MessageQueue 
    //队列是互斥的,同时只能有一个线程访问队列
    queue <Task> q;
    mutex m;
    condition_variable cv;
 message_queue;

class Poll 
public:
    void add(User user) 
        users.push_back(user);
    

    void remove(User user) 
        for (uint32_t i = 0; i < users.size(); i++) 
            if (users[i].id == user.id) 
                users.erase(users.begin() + i);
                break;
            
        
    

    void match() 
        while (users.size() > 1) 
//            printf("队列长度为: %ld\\n", users.size());
            sort(users.begin(),users.end(),[&](User &a,User b)
                return a.score<b.score;
            );
            bool flag = true;
            for(uint32_t i = 1 ; i <users.size(); i++) 
              auto a = users[i-1],b = users[i];
                if (b.score-a.score<=50)
                    users.erase(users.begin()+i,users.begin()+i+1);

                    save_result(a.id,b.id);
                    flag = false;
                    break;
                
            
            if (flag)
                break;
            

        
    

    void save_result(int a, int b) 
        printf(" %d 和 %d 匹配成功\\n", a, b);

        std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
        std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        SaveClient client(protocol);

        try 
            transport->open();
            //----------在此之间实现自己的业务------------
                //第二个参数是myserver密码的MD5值的前8位
            client.save_data("acs_1642","6a46581f",a,b);

            //-----------------------------------------
            transport->close();
         catch (TException &tx) 
            cout << "ERROR: " << tx.what() << endl;
        

    

private:
    vector <User> users;
 pool;


class MatchHandler : virtual public MatchIf 
public:
    MatchHandler() 
        // 在这里初始化
    

    int32_t add_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("add_user\\n");


        unique_lock <mutex> lock1(message_queue.m);//加锁
        message_queue.q.push(user, "add");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();


        return 0;
    

    int32_t remove_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("remove_user\\n");


        unique_lock <mutex> lock1(message_queue.m);//加锁,在队列为空的时候,不能拿到锁
        message_queue.q.push(user, "remove");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();

        return 0;
    

;


//线程操作的函数
void consume_task() 
    while (true) 
        unique_lock <mutex> lock1(message_queue.m);//加锁
        if (message_queue.q.empty()) 
            //因为队列初始一定是空的,如果直接continue,会死循环。因此在初始时,应在有add操作后,才应该执行这里
            //continue
//            message_queue.cv.wait(lock1);
            lock1.unlock();
            pool.match();
            sleep(1);
            //当队列为空时。当前线程放弃持有锁,由其他线程持有锁,在进行匹配。这个过程1s后,再进行后面的判断
         else 
            auto task = message_queue.q.front();
            message_queue.q.pop();
            //因为只有队列是互斥的,为了保证程序的快速运行,操作完队列就应该释放锁
            lock1.unlock();
            //具体任务
            if (task.type == "add") 
                pool.add(task.user);
//                printf("执行了1次add");
             else if (task.type == "remove") 
                pool.remove(task.user);
            
            pool.match();

        
    


int main(int argc, char **argv) 
    int port = 9090;
    ::std::shared_ptr<MatchHandler> handler(new MatchHandler());
    ::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
    ::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    ::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    ::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    printf("Match server start\\n");

    thread matching_thread(consume_task);


    server.serve();
    return 0;


匹配系统3.0(升级为多线程服务器)

之前的版本都是用一个线程来add和remove user,想要提高效率和并发量,可以将服务端升级为多线程版本

步骤

  1. 引入官网 C++样例Server中,main.cpp没有的头文件。

  2. main函数中的TSimpleServer即相关函数,替换成官网 C++样例Server中的main函数中的TThreadedServer相关内容

  3. 官网 C++样例Server中的class CalculatorCloneFactory相关内容加进来

  4. 将文件中的所有Calculator替换为Match,在vim中的具体操作为:

    :1,$s/Calculator/Match/g
    
  5.     void releaseHandler(::shared::SharedServiceIf *handler) override 
            delete handler;
        
    

    替换为:

        void releaseHandler(MatchIf *handler) override 
            delete handler;
        
    

编译运行检查错误

匹配系统4.0(随时间扩大匹配阈值)

// 这个自动生成的框架文件演示了如何构建服务器。
// 你应该把它复制到另一个文件名以避免覆盖它。

#include "match_server/Match.h"
#include "save_client/Save.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TTransportUtils.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/ThreadFactory.h>
#include <thrift/TToString.h>
#include <iostream>
#include <mutex>  //锁的头文件
#include <thread>  //线程的头文件
#include <condition_variable>  //条件变量的头文件
#include <queue>
#include <vector>
#include <unistd.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace ::save_service;
using namespace std;

struct Task 
    User user;
    string type;
;

struct MessageQueue 
    //队列是互斥的,同时只能有一个线程访问队列
    queue<Task> q;
    mutex m;
    condition_variable cv;
 message_queue;

class Poll 
public:
    void add(User user) 
        users.push_back(user);
        wt.push_back(0);
    

    void remove(User user) 
        for (uint32_t i = 0; i < users.size(); i++) 
            if (users[i].id == user.id) 
                users.erase(users.begin() + i);
                break;
            
        
     

    bool check_match(uint32_t i, uint32_t j) 
        auto a = users[i], b = users[j];

        int dt = abs(a.score - b.score);
        int a_max_dif = wt[i] * 50;
        int b_max_dif = wt[j] * 50;

        return dt <= a_max_dif && dt <= b_max_dif;
    

    void match() 
        for (uint32_t i = 0; i < wt.size(); i++)
            wt[i]++;   // 等待秒数 + 1
//            sort(users.begin(), users.end(), [&](User &a, User b) 
//                return a.score < b.score;
//            );
        while (users.size() > 1) 
            bool flag = true;
            for (uint32_t i = 0; i < users.size(); i++) 
                for (uint32_t j = i + 1; j < users.size(); j++) 
                    if (check_match(i, j)) 
                        auto a = users[i], b = users[j];
                        users.erase(users.begin() + j);
                        users.erase(users.begin() + i);
                        wt.erase(wt.begin() + j);
                        wt.erase(wt.begin() + i);
                        save_result(a.id, b.id);
                        flag = false;
                        break;
                    
                

                if (!flag) break;
            

            if (flag) break;
        
    


    void save_result(int a, int b) 
        printf(" %d 和 %d 匹配成功\\n", a, b);

        std::shared_ptr<TTransport> socket(new TSocket("123.57.47.211", 9090));
        std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
        std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
        SaveClient client(protocol);

        try 
            transport->open();
            //----------在此之间实现自己的业务------------
            //第二个参数是myserver密码的MD5值的前8位
            client.save_data("acs_1642", "6a46581f", a, b);

            //-----------------------------------------
            transport->close();
         catch (TException &tx) 
            cout << "ERROR: " << tx.what() << endl;
        

    

private:
    vector<User> users;
    vector<int> wt;
 pool;


class MatchHandler : virtual public MatchIf 
public:
    MatchHandler() 
        // 在这里初始化
    

    int32_t add_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("add_user\\n");


        unique_lock<mutex> lock1(message_queue.m);//加锁
        message_queue.q.push(user, "add");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();


        return 0;
    

    int32_t remove_user(const User &user, const std::string &info) 
        // 在这里实现接口
        printf("remove_user\\n");


        unique_lock<mutex> lock1(message_queue.m);//加锁,在队列为空的时候,不能拿到锁
        message_queue.q.push(user, "remove");
        //当有操作时,应该唤醒线程
        message_queue.cv.notify_all();

        return 0;
    

;

class MatchCloneFactory : virtual public MatchIfFactory 
public:
    ~MatchCloneFactory() override = default;

    MatchIf *getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override 
        std::shared_ptr<TSocket> sock = std::dynamic_pointer_cast<TSocket>(connInfo.transport);
//        cout << "Incoming connection\\n";
//        cout << "\\tSocketInfo: "  << sock->getSocketInfo() << "\\n";
//        cout << "\\tPeerHost: "    << sock->getPeerHost() << "\\n";
//        cout << "\\tPeerAddress: " << sock->getPeerAddress() << "\\n";
//        cout << "\\tPeerPort: "    << sock->getPeerPort() << "\\n";
        return new MatchHandler;
    

    void releaseHandler(MatchIf *handler) override 
        delete handler;
    
;

//线程操作的函数
void consume_task() 
    while (true) 
        unique_lock<mutex> lock1(message_queue.m);//加锁
        if (message_queue.q.empty()) 
            //因为队列初始一定是空的,如果直接continue,会死循环。因此在初始时,应在有add操作后,才应该执行这里
            //continue
//            message_queue.cv.wait(lock1);
            lock1.unlock();
            pool.match();
            sleep(1);
            //当队列为空时。当前线程放弃持有锁,由其他线程持有锁,在进行匹配。这个过程1s后,再进行后面的判断
         else 
            auto task = message_queue.q.front();
            message_queue.q.pop();
            //因为只有队列是互斥的,为了保证程序的快速运行,操作完队列就应该释放锁
            lock1.unlock();
            //具体任务
            if (task.type == "add") 
                pool.add(task.user);
//                printf("执行了1次add");
             else if (task.type == "remove") 
                pool.remove(task.user);
            
//            pool.match();

        
    


int main(int argc, char **argv) 
    TThreadedServer server(
            std::make_shared<MatchProcessorFactory>(std::make_shared<MatchCloneFactory>()),
            std::make_shared<TServerSocket>(9090), //port
            std::make_shared<TBufferedTransportFactory>(),
            std::make_shared<TBinaryProtocolFactory>()
    );
    printf("Match server start\\n");

    thread matching_thread(consume_task);


    server.serve();
    return 0;


lock1(message_queue.m);//加锁
if (message_queue.q.empty())
//因为队列初始一定是空的,如果直接continue,会死循环。因此在初始时,应在有add操作后,才应该执行这里
//continue
// message_queue.cv.wait(lock1);
lock1.unlock();
pool.match();
sleep(1);
//当队列为空时。当前线程放弃持有锁,由其他线程持有锁,在进行匹配。这个过程1s后,再进行后面的判断
else
auto task = message_queue.q.front();
message_queue.q.pop();
//因为只有队列是互斥的,为了保证程序的快速运行,操作完队列就应该释放锁
lock1.unlock();
//具体任务
if (task.type == “add”)
pool.add(task.user);
// printf(“执行了1次add”);
else if (task.type == “remove”)
pool.remove(task.user);

// pool.match();

    

int main(int argc, char **argv)
TThreadedServer server(
std::make_shared(std::make_shared()),
std::make_shared(9090), //port
std::make_shared(),
std::make_shared()
);
printf(“Match server start\\n”);

thread matching_thread(consume_task);


server.serve();
return 0;


以上是关于thrift开发笔记的主要内容,如果未能解决你的问题,请参考以下文章

thrift开发笔记(4种由易到难的匹配系统)

thrift学习笔记 thrift简介及第一个helloword程序

Scala + Thrift+ Zookeeper+Flume+Kafka配置笔记

RPC远程协议之Thrift入门

thrift-go(golang)Server端笔记

go thrift 开发