缩略muduo库:TcpConnection
Posted 看,未来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了缩略muduo库:TcpConnection相关的知识,希望对你有一定的参考价值。
最近VScode坏了,莫名其妙连不上虚拟机了,很难受。
已经判定不是Linux的问题,因为用cmd可以远程连接上。
所以这份就用VS先顶一下了,报了一堆的错也看不清楚。
CallBack.hpp
存放一些回调声明。
#pragma once
#include <functional>
class Buffer;
class TcpConnection;
class timestamp;
using TcpConnectionptr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void (const TcpConnectionptr&)>;
using CloseCallback = std::function<void (const TcpConnectionptr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionptr&)>;
using MessageCallback = std::function<void (const TcpConnectionptr&,Buffer*,timestamp)>;
using ThreadInitCallback = std::function<void(EventLoop*)>;
TcpConnection.hpp
#pragma once
#include "nocopyable.hpp"
#include "InetAddr.hpp"
#include "callback.hpp"
#include "buffer.hpp"
#include "timestamp.hpp"
#include <memory>
#include <string>
#include <atomic>
class Channel;
class EventLoop;
class Socket;
//TcpServer 通过 Acceptor,当有一个新的accept函数拿到connfd,TCPConnection设置回调,转channel,进poller,调用channel的回调操作
class TcpConnection :public nocpoyable, public std::enable_shared_from_this<TcpConnection>
{ //得到当前对象的智能指针
public:
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
TcpConnection(EventLoop* loop,
const std::string& name,
int sockfd,
const InetAddress& localaddr,
const InetAddress& peeraddr);
~TcpConnection();
EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddress() const { return localaddr_; }
const InetAddress& peerAddress() const { return peeraddr_; }
bool connected() const { return state_ == kConnected; }
void send(const std::string &buf);
void shutdown();
void setConnectionCallback(const ConnectionCallback& cb)
{
connectionCallback_ = cb;
}
void setMessageCallback(const MessageCallback& cb)
{
messageCallback_ = cb;
}
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{
writeCompleteCallback_ = cb;
}
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{
highWaterMarkCallback_ = cb;
highWaterMark_ = highWaterMark;
}
void connectEstablished();
void connectDestroyed();
private:
enum State
{
kDisconnected,
kConnecting,
kConnected,
kDisconnecting
};
void setState(State s) { state_ = s; }
void handleRead(timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
EventLoop* loop_; //存在于subloop中
const std::string name_;
std::atomic_int state_;
bool reading_;
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localaddr_;
const InetAddress peeraddr_;
Buffer inputBuffer_;
Buffer outputBuffer_;
size_t highWaterMark_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
};
TcpConnection.cc
#include "logger.hpp"
#include "tcpconnection.hpp"
#include "eventloop.hpp"
#include "channel.hpp"
#include "socket.hpp"
#include "eventloop.hpp"
#include <functional>
#include <errno.h>
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d mainloop is null!\\n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpConnection::TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localaddr,
const InetAddress& peeraddr)
: loop_(CheckLoopNotNull(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localaddr_(localaddr),
peeraddr_(peeraddr),
highWaterMark_(64 * 1024 * 1024)
{
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG("TcpConnection::ctor[%s] at %p fd= %d \\n", name_, this, sockfd);
//socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{}
void TcpConnection::handleRead(timestamp receiveTime)
{
int saveerrno = 0;
ssize_t n = inputBuffer_.readFD(channel_->fd(), &saveerrno);
if (n > 0)
{
messageCallback_(std::shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
errno = saveerrno;
LOG_ERROR("TcpConnection::handleRead\\n");
handleError();
}
}
void TcpConnection::handleWrite()
{
if (channel_->isWriting())
{
int saveErrno = 0;
ssize_t n = outputBuffer_.writeFD(channel_->fd(), &saveErrno);
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readablebuffer() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, std::shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleRead\\n");
}
}
else
{
LOG_ERROR("Connection fd = %d is done\\n", channel_->fd());
}
}
void TcpConnection::handleClose()
{
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connptr(std::shared_from_this());
connectionCallback_(connptr);
closeCallback_(connptr);
}
void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err;
if (::getsockopt(channel_->fd(), SOL_SOCKET, optval, &optlen) < 0) {
err = errno;
}
else {
err = 0;
}
LOG_ERROR("TcpConnection::handleError [%s] - SO_ERROR = %d\\n", name_.c_str(), err);
}
void TcpConnection::send(const std::string &buf) {
if (state_ == kConnected) {
if(loop_->isInLoopThread()) {
sendInLoop(buf.c_str(),buf.size());
}
else {
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));
}
}
}
//应用写的快,内核写得慢,所以需要缓冲区,且设置水位线,防止写入过快
void TcpConnection::sendInLoop(const void* message, size_t len)
{
ssize_t wrote = 0;
ssize_t remaining = len;
bool faulterr = false;
if (state_ == kDisconnected) {
LOG_ERROR("disconnected,give up writing! \\n");
return;
}
//channel第一次开始写数据,而且缓冲区没有待发送数据
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
wrote = ::write(channel_->fd(), message, len);
if (wrote >= 0) {
remaining = len - wrote;
//数据一次性发送完成,不用再对poller设置epollout事件
if (remaining == 0 && WriteCompleteCallback_) {
loop_->queueInLoop(std::bind(WriteCompleteCallback_,shared_from_this()));
}
}
else {
wrote = 0;
if (errno != EWOULDBLOCK) {
LOG_ERROR("TcpConnect::sendInLoop\\n");
if (errno == EPIPE || errno == ECONNRESET) {
faulterr = true;
}
}
}
}
//说明这次write没有把数据全部发送,剩余数据需要保存到缓冲区中,
//给channel注册epollout事件,poller会通知相应sock->channel调用相应回调方法
if (!faulterr && wrote > 0) {
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed()
{
if (state_ == kConnected) {
setState(kDisconnected);
channel_->disableAll();
ConnectionCallback_(shared_from_this());
}
channel_->remove();
}
void TcpConnection::shutdown()
{
if (state_ == kConnected){
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
以上是关于缩略muduo库:TcpConnection的主要内容,如果未能解决你的问题,请参考以下文章