缩略muduo库:TcpServer

Posted 看,未来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了缩略muduo库:TcpServer相关的知识,希望对你有一定的参考价值。

准备讲解了,这里就直接放代码吧。

#pragma once

#include "EventLoop.hpp"
#include "Accept.hpp"
#include "EventLoopThreadPool.hpp"
#include "InetAddr.hpp"
#include "nocopyable.hpp"
#include "callback.hpp"

#include <string>
#include <functional>
#include <atomic>
#include <unordered_map>
#include <memory>


class TcpServer : nocpoyable
{
public:
    //using ThreadInitCallback = std::function<void(EventLoop *)>;

    enum Option
    {
        kNoReusePort,
        kReusePort,
    };

    TcpServer(EventLoop* loop,
        const InetAddress& listenAddr,
        const std::string& nameArg,
        Option option = kNoReusePort);
    ~TcpServer(); // force out-line dtor, for std::unique_ptr members.

    const std::string& ipPort() const { return ipport_; }
    const std::string& name() const { return name_; }
    EventLoop* getLoop() const { return loop_; }

    //设置底层subloop个数
    void setThreadNum(int numThreads);

    //开启服务器监听
    void start();

    void setThreadInitCallback(const ThreadInitCallback& cb)
    {
        threadInitCallback_ = cb;
    }

    void setConnectionCallback(const ConnectionCallback& cb)
    {
        connectionCallback_ = cb;
    }

    void setMessageCallback(const MessageCallback& cb)
    {
        messageCallback_ = cb;
    }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {
        writeCompleteCallback_ = cb;
    }

private:
    /// Not thread safe, but in loop
    void newConnection(int sockfd, const InetAddress& peerAddr);
    /// Thread safe.
    void removeConnection(const TcpConnectionptr& conn);
    /// Not thread safe, but in loop
    void removeConnectionInLoop(const TcpConnectionptr& conn);

    using ConnectionMap = std::unordered_map<std::string, TcpConnectionptr>;

    EventLoop* loop_; //baseloop
    const std::string name_;
    const std::string ipport_;
    std::unique_ptr<Accept> acceptor_;
    std::shared_ptr<EventLoopThreadPool> threadpool_;

    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    ThreadInitCallback threadInitCallback_;

    std::atomic_int started_;

    int nextConnId_;
    ConnectionMap connections_;
};
#include "TcpServer.hpp"
#include "Logger.hpp"
#include "TcpConnection.hpp"

#include <functional>
#include <string.h>

EventLoop* CheckLoopNotNull(EventLoop* loop) {
    if (loop == nullptr) {
        LOG_FATAL("%s:%s:%d mainloop is null \\n", __FILE__, __FUNCTION__, __LINE__);
    }

    return loop;
}

TcpServer::TcpServer(EventLoop* loop,
    const InetAddress& listenAddr,
    const std::string& nameArg,
    Option option = kNoReusePort)
    :loop_(CheckLoopNotNull(loop)),
    ipport_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Accept(loop, listenAddr, option == kReusePort)),
    threadpool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(),
    messageCallback_(),
    nextConnId_(1)
{
    //当有新用户连接时,会执行NewConnectionCallback
    acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2));
}


//当有新链接来的时候,acceptor会调用这个回调
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr) {
    //根据轮询算法,选择一个subloop,唤醒subloop
    EventLoop* ioloop = threadpool_->GetNextLoop();
    char buf[64] = { 0 };
    snprintf(buf, sizeof buf, "-%s#%d", ipport_.c_str(), nextConnId_);
    ++nextConnId_;
    std::string connName = name_ + buf;

    LOG_INFO("TcpConnnection::newConnection [%s] -new connection [%s] from %s \\n",
        name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

    //通过sockfd获取其本机IP
    sockaddr_in local;
    ::bzero(&local, sizeof local);
    socklen_t addrlen = sizeof local;

    if (::getSockname(sockfd, (sockaddr*)&local, &addrlen) < 0) {

    }
    InetAddress localAddr(::getLocalAddr(sockfd));

    //根据连接成功的fd,创建TCPConnection连接对象
    TcpConnectionptr conn(new TcpConnection(ioloop, connName, sockfd, localAddr, peerAddr));
    connections_[connName] = conn;

    conn->setConnectionCallback(connectionCallback_);
    conn->setMessageCallback(messageCallback_);
    conn->setWriteCompleteCallback(writeCompleteCallback_);
    conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
    
    ioloop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
    //把当前connfd封装成channel分发给subloop

}

TcpServer::~TcpServer() {}


//设置底层subloop个数
void TcpServer::setThreadNum(int numThreads) {
    threadpool_->setThreadNum(numThreads);
}

//开启服务器监听
void TcpServer::start() {
    if (started_ == 0) {  //防止被多次start
        threadpool_->start(threadInitCallback_);
        loop_->runInLoop(std::bind(&Accept::listen, acceptor_.get()));
        ++started_;
    }
}

void TcpServer::removeConnection(const TcpConnectionptr& conn) {
    loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

/// Not thread safe, but in loop
void TcpServer::removeConnectionInLoop(const TcpConnectionptr& conn) {
    connections_.erase(conn->name());
   
    EventLoop* ioLoop = conn->getLoop();
    ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

以上是关于缩略muduo库:TcpServer的主要内容,如果未能解决你的问题,请参考以下文章

缩略版muduo网络库:事件处理器 Chanel

muduo网络库源码寻宝:TcpServer

muduo库中TcpServer一次完整的工作流程

缩略muduo网络库周边小代码

缩略muduo网络库周边小代码

缩略muduo库:事件循环 EventLoop