缩略muduo库:TcpConnection

Posted 看,未来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了缩略muduo库:TcpConnection相关的知识,希望对你有一定的参考价值。

最近VScode坏了,莫名其妙连不上虚拟机了,很难受。
已经判定不是Linux的问题,因为用cmd可以远程连接上。
所以这份就用VS先顶一下了,报了一堆的错也看不清楚。


CallBack.hpp

存放一些回调声明。

#pragma once

#include <functional>

class Buffer;
class TcpConnection;
class timestamp;

using TcpConnectionptr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void (const TcpConnectionptr&)>;
using CloseCallback = std::function<void (const TcpConnectionptr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionptr&)>;
using MessageCallback =  std::function<void (const TcpConnectionptr&,Buffer*,timestamp)>;
using ThreadInitCallback = std::function<void(EventLoop*)>;

TcpConnection.hpp

#pragma once

#include "nocopyable.hpp"
#include "InetAddr.hpp"
#include "callback.hpp"
#include "buffer.hpp"
#include "timestamp.hpp"

#include <memory>
#include <string>
#include <atomic>

class Channel;
class EventLoop;
class Socket;

//TcpServer 通过 Acceptor,当有一个新的accept函数拿到connfd,TCPConnection设置回调,转channel,进poller,调用channel的回调操作
class TcpConnection :public nocpoyable, public std::enable_shared_from_this<TcpConnection>
{ //得到当前对象的智能指针
public:
    using TcpConnectionPtr = std::shared_ptr<TcpConnection>;

    TcpConnection(EventLoop* loop,
        const std::string& name,
        int sockfd,
        const InetAddress& localaddr,
        const InetAddress& peeraddr);

    ~TcpConnection();

    EventLoop* getLoop() const { return loop_; }
    const std::string& name() const { return name_; }
    const InetAddress& localAddress() const { return localaddr_; }
    const InetAddress& peerAddress() const { return peeraddr_; }

    bool connected() const { return state_ == kConnected; }

    void send(const std::string &buf);
    void shutdown();

    void setConnectionCallback(const ConnectionCallback& cb)
    {
        connectionCallback_ = cb;
    }

    void setMessageCallback(const MessageCallback& cb)
    {
        messageCallback_ = cb;
    }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    {
        writeCompleteCallback_ = cb;
    }

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
    {
        highWaterMarkCallback_ = cb;
        highWaterMark_ = highWaterMark;
    }

    void connectEstablished();
    void connectDestroyed();


private:
    enum State
    {
        kDisconnected,
        kConnecting,
        kConnected,
        kDisconnecting
    };

    void setState(State s) { state_ = s; }

    void handleRead(timestamp receiveTime);
    void handleWrite();
    void handleClose();
    void handleError();

    void sendInLoop(const void* message, size_t len);
    void shutdownInLoop();

    EventLoop* loop_; //存在于subloop中
    const std::string name_;
    std::atomic_int state_;
    bool reading_;

    std::unique_ptr<Socket> socket_;
    std::unique_ptr<Channel> channel_;

    const InetAddress localaddr_;
    const InetAddress peeraddr_;

    Buffer inputBuffer_;
    Buffer outputBuffer_;
    size_t highWaterMark_;

    ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    ThreadInitCallback threadInitCallback_;
    HighWaterMarkCallback highWaterMarkCallback_;
    CloseCallback closeCallback_;
};

TcpConnection.cc

#include "logger.hpp"
#include "tcpconnection.hpp"
#include "eventloop.hpp"
#include "channel.hpp"
#include "socket.hpp"
#include "eventloop.hpp"

#include <functional>
#include <errno.h>

static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
    if (loop == nullptr)
    {
        LOG_FATAL("%s:%s:%d mainloop is null!\\n", __FILE__, __FUNCTION__, __LINE__);
    }
    return loop;
}

TcpConnection::TcpConnection(EventLoop* loop,
    const std::string& nameArg,
    int sockfd,
    const InetAddress& localaddr,
    const InetAddress& peeraddr)
    : loop_(CheckLoopNotNull(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localaddr_(localaddr),
    peeraddr_(peeraddr),
    highWaterMark_(64 * 1024 * 1024)
{
    channel_->setReadCallback(
        std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
    channel_->setWriteCallback(
        std::bind(&TcpConnection::handleWrite, this));
    channel_->setCloseCallback(
        std::bind(&TcpConnection::handleClose, this));
    channel_->setErrorCallback(
        std::bind(&TcpConnection::handleError, this));
    LOG_DEBUG("TcpConnection::ctor[%s] at %p fd= %d \\n", name_, this, sockfd);

    //socket_->setKeepAlive(true);
}

TcpConnection::~TcpConnection()
{}


void TcpConnection::handleRead(timestamp receiveTime)
{
    int saveerrno = 0;
    ssize_t n = inputBuffer_.readFD(channel_->fd(), &saveerrno);

    if (n > 0)
    {
        messageCallback_(std::shared_from_this(), &inputBuffer_, receiveTime);
    }
    else if (n == 0)
    {
        errno = saveerrno;
        LOG_ERROR("TcpConnection::handleRead\\n");
        handleError();
    }
}

void TcpConnection::handleWrite()
{
    if (channel_->isWriting())
    {
        int saveErrno = 0;
        ssize_t n = outputBuffer_.writeFD(channel_->fd(), &saveErrno);

        if (n > 0)
        {
            outputBuffer_.retrieve(n);

            if (outputBuffer_.readablebuffer() == 0)
            {
                channel_->disableWriting();
                if (writeCompleteCallback_)
                {
                    loop_->queueInLoop(std::bind(writeCompleteCallback_, std::shared_from_this()));
                }
                if (state_ == kDisconnecting)
                {
                    shutdownInLoop();
                }
            }
        }
        else
        {
            LOG_ERROR("TcpConnection::handleRead\\n");
        }
    }
    else
    {
        LOG_ERROR("Connection fd = %d is done\\n", channel_->fd());
    }
}

void TcpConnection::handleClose()
{
    setState(kDisconnected);
    channel_->disableAll();

    TcpConnectionPtr connptr(std::shared_from_this());
    connectionCallback_(connptr);

    closeCallback_(connptr);
}

void TcpConnection::handleError()
{
    int optval;
    socklen_t optlen = sizeof optval;

    int err;

    if (::getsockopt(channel_->fd(), SOL_SOCKET, optval, &optlen) < 0) {
        err = errno;

    }
    else {
        err = 0;
    }

    LOG_ERROR("TcpConnection::handleError [%s] - SO_ERROR = %d\\n", name_.c_str(), err);
}


void TcpConnection::send(const std::string &buf) {
    if (state_ == kConnected) {
        if(loop_->isInLoopThread()) {
            sendInLoop(buf.c_str(),buf.size());
        }
        else {
            loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));
        }
    }
}

//应用写的快,内核写得慢,所以需要缓冲区,且设置水位线,防止写入过快
void TcpConnection::sendInLoop(const void* message, size_t len)
{
    ssize_t wrote = 0;
    ssize_t remaining = len;
    bool faulterr = false;

    if (state_ == kDisconnected) {
        LOG_ERROR("disconnected,give up writing! \\n");
        return;
    }

    //channel第一次开始写数据,而且缓冲区没有待发送数据
    if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
        wrote = ::write(channel_->fd(), message, len);

        if (wrote >= 0) {
            remaining = len - wrote;
            
            //数据一次性发送完成,不用再对poller设置epollout事件
            if (remaining == 0 && WriteCompleteCallback_) {
                loop_->queueInLoop(std::bind(WriteCompleteCallback_,shared_from_this()));
            }
        }
        else {
            wrote = 0;

            if (errno != EWOULDBLOCK) {
                LOG_ERROR("TcpConnect::sendInLoop\\n");

                if (errno == EPIPE || errno == ECONNRESET) {
                    faulterr = true;
                }
            }
            
        }
    }

    //说明这次write没有把数据全部发送,剩余数据需要保存到缓冲区中,
    //给channel注册epollout事件,poller会通知相应sock->channel调用相应回调方法
    if (!faulterr && wrote > 0) {
        size_t oldLen = outputBuffer_.readableBytes();
        if (oldLen + remaining >= highWaterMark_
            && oldLen < highWaterMark_
            && highWaterMarkCallback_)
        {
            loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
        }
        outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
        if (!channel_->isWriting())
        {
            channel_->enableWriting();
        }
    }
}

void TcpConnection::connectEstablished()
{
    setState(kConnected);
    channel_->tie(shared_from_this());
    channel_->enableReading();

    connectionCallback_(shared_from_this());
}

void TcpConnection::connectDestroyed()
{
    if (state_ == kConnected) {
        setState(kDisconnected);
        channel_->disableAll();
        ConnectionCallback_(shared_from_this());
    }
    channel_->remove();
}

void TcpConnection::shutdown()
{
    if (state_ == kConnected){
        setState(kDisconnecting);
        // FIXME: shared_from_this()?
        loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
    }
}

void TcpConnection::shutdownInLoop()
{
    if (!channel_->isWriting())
    {
        // we are not writing
        socket_->shutdownWrite();
    }
}

以上是关于缩略muduo库:TcpConnection的主要内容,如果未能解决你的问题,请参考以下文章

缩略muduo网络库周边小代码

缩略muduo网络库周边小代码

缩略muduo库:TcpServer

缩略muduo库:事件循环 EventLoop

缩略muduo库:事件循环 EventLoop

缩略muduo库:Buffer 缓冲区