muduo网络库源码寻宝:TcpServer

Posted 看,未来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了muduo网络库源码寻宝:TcpServer相关的知识,希望对你有一定的参考价值。

耗时一个月,总算是看完一遍再抄写了一遍了,发现很多不错的东西,不过之前只顾着看,没有过多的总结,现在开始讲讲我的心得吧,能力有限,能讲到哪里算哪里。

一切尽在注释中。。。


muduo库使用案例

#include<iostream>
#include "chatserver.hpp"
#include "chatservice.hpp"

using namespace std;

int main(int argc, char **argv){

    char *ip = argv[1];
    uint16_t port = atoi(argv[2]);

    EventLoop loop;
    InetAddress addr(ip, port);
    ChatServer server(&loop,addr,"ChatServer");	//这里的server主要是一个TcpServer对象

    server.start();
    loop.loop();
    return 0;
}

TcpSerVer 类声明

#pragma once

//EventLoop相关
#include "EventLoop.hpp"
#include "EventLoopThreadPool.hpp"

//Socket相关
#include "Accept.hpp"
#include "InetAddr.hpp"
#include "TcpConnection.hpp"

//else
#include "Logger.hpp"
#include "nocopyable.hpp"
#include "callback.hpp"

#include <string>
#include <functional>
#include <atomic>
#include <unordered_map>
#include <memory>

看这个头文件就大概知道这个类会牵扯到哪些部分了,有EventLoop、TCPConnection、Logger


class TcpServer : nocpoyable
{
private:
    /// Not thread safe, but in loop
    void newConnection(int sockfd, const InetAddress& peerAddr);
    /// Thread safe.
    void removeConnection(const TcpConnectionptr& conn);
    /// Not thread safe, but in loop
    void removeConnectionInLoop(const TcpConnectionptr& conn);

    using ConnectionMap = std::unordered_map<std::string, TcpConnectionptr>;

    EventLoop* loop_; //baseloop
    const std::string name_;
    const std::string ipport_;
    std::unique_ptr<Accept> acceptor_;
    std::shared_ptr<EventLoopThreadPool> threadpool_;

    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    ThreadInitCallback threadInitCallback_;

    std::atomic_int started_;

    int nextConnId_;
    ConnectionMap connections_;
};

成员对象不少,比较重要的有:

 EventLoop* loop_; //baseloop
 std::unique_ptr<Accept> acceptor_;
 std::shared_ptr<EventLoopThreadPool> threadpool_;
 ConnectionMap connections_;

class TcpServer : nocpoyable
{
public:
    //using ThreadInitCallback = std::function<void(EventLoop *)>;

    enum Option
    {
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop* loop,
        const InetAddress& listenAddr,
        const std::string& nameArg,
        Option option = kNoReusePort);
    ~TcpServer(); // force out-line dtor, for std::unique_ptr members.

    const std::string& ipPort() const { return ipport_; }
    const std::string& name() const { return name_; }
    EventLoop* getLoop() const { return loop_; }

    //设置底层subloop个数
    void setThreadNum(int numThreads);

    //开启服务器监听
    void start();

    void setThreadInitCallback(const ThreadInitCallback& cb)
    {
        threadInitCallback_ = cb;
    }

    void setConnectionCallback(const ConnectionCallback& cb)
    {
        connectionCallback_ = cb;
    }

    void setMessageCallback(const MessageCallback& cb)
    {
        messageCallback_ = cb;
    }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {
        writeCompleteCallback_ = cb;
    }
};

函数声明没什么好看的,这些回调嘛,结合项目来看会比较好:chat集群聊天室项目 代码+讲解(一):网络模块


类方法实现

这里有必要放一下这张图:

#include "TcpServer.hpp"

EventLoop* CheckLoopNotNull(EventLoop* loop) {
    if (loop == nullptr) {
        LOG_FATAL("%s:%s:%d mainloop is null \\n", __FILE__, __FUNCTION__, __LINE__);	//以后打印日志这样打印
    }

    return loop;
}

TcpServer::TcpServer(EventLoop* loop,
    const InetAddress& listenAddr,
    const std::string& nameArg,
    Option option = kNoReusePort)
    :loop_(CheckLoopNotNull(loop)),
    ipport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Accept(loop, listenAddr, option == kReusePort)),	
    //在这里对sock进行了初始化,不过还没有监听,更没有accept,关于accept在后续章节再提,快了
    //只有在 server start 之后才会listen,listen到才会去accept
    
    threadpool_(new EventLoopThreadPool(loop, name_)),
		//构建一个 EventLoopThreadPool (可以视为mainreactor)对象,
		//不过也就是构建一下,不干啥,关于EventLoopThreadPool的章节后面会提
		//start之后会创建制定数量的线程,并绑定新的loop,返回地址。
	
    connectionCallback_(),
    messageCallback_(),
    nextConnId_(1)
{
    //当有新用户连接时,会执行NewConnectionCallback
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
    //在Acceptor 的handleread方法了被调用
}

//开启服务器监听
void TcpServer::start() {
    if (started_ == 0) {  //防止被多次start
        threadpool_->start(threadInitCallback_);	//EventLoopThreadPool的start
        loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));	//这里拿来run的loop就是mainloop
        ++started_;
    }
}

//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
    //根据轮询算法,选择一个subloop,唤醒subloop
    EventLoop* ioloop = threadpool_->GetNextLoop();
    char buf[64] = { 0 };
    snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \\n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    //通过sockfd获取其本机IP
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;

    if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {
				//pass,日志打印,写漏了
    }
    InetAddress localAddr(::getLocalAddr(sockfd));

    //根据连接成功的fd,创建TCPConnection连接对象
		//一个连接对应一个 TCPConnectionptr 管理
		//关于TCPConnection的事情也是接下来展开
    TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
    
    ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    //把当前connfd封装成channel分发给subloop

}

TcpServer::~TcpServer() {}


//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
    threadpool_->setThreadNum(numThreads);
}


void TcpServer::removeConnection(const TcpConnectionptr& conn) {
    loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

/// Not thread safe, but in loop
void TcpServer::removeConnectionInLoop(const TcpConnectionptr& conn) {
    connections_.erase(conn->name());
   
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

以上是关于muduo网络库源码寻宝:TcpServer的主要内容,如果未能解决你的问题,请参考以下文章

C++搭建集群聊天室:muduo网络库

muduo库中TcpServer一次完整的工作流程

缩略muduo库:TcpServer

缩略版muduo网络库:事件处理器 Chanel

缩略版muduo网络库:事件处理器 Chanel

Muduo网络库源码分析 EventLoop事件循环(Poller和Channel)