Skip to main content

muduo项目解析

1.noncopyable类

noncopyable被继承以后,派生类对象可以正常的构造和析构,但是派生类对象无法进行拷贝构造和赋值操作

noncopyable.h

#pragma once

/**
* noncopyable被继承以后,派生类对象可以正常的构造和析构,但是派生类对象
* 无法进行拷贝构造和赋值操作
*/
class noncopyable
{
public:
//将类的拷贝构造 和 赋值构造 delete掉,继承的对象无法进行拷贝和赋值
noncopyable(const noncopyable&) = delete;
noncopyable& operator=(const noncopyable&) = delete;
protected:
noncopyable() = default;
~noncopyable() = default;
};

2.Logger日志类

日志类的主要作用是实现日志输出,日志类是一个单例类,instance()方法就是获取日志的唯一实例对象。

使用宏定义方便使用(无需构造类)

#define LOG_INFO(logmsgFormat, ...) 中logmsgFormat是参数字符串,...表示可变参数。

多行宏定义使用do {}while(0) 防止错误,每行宏定义需要加"\"处理。

其中 ##__VA_ARGS__ 为上面的 ... 参数值。

snprintf()函数将字符格式化。

#ifdef MUDEBUG
//若定义了MUDEBUG则执行这里面的内容,可以实现选择性打印某些日志信息
#else
//若没有定义MUDEBUG则执行这里面的内容
#endif

Logger.h

#pragma once

#include <string>

#include "noncopyable.h"

// LOG_INFO("%s %d", arg1, arg2)
#define LOG_INFO(logmsgFormat, ...) \
do \
{ \
Logger &logger = Logger::instance(); \
logger.setLogLevel(INFO); \
char buf[1024] = {0}; \
snprintf(buf, 1024, logmsgFormat, ##__VA_ARGS__); \
logger.log(buf); \
} while(0)

#define LOG_ERROR(logmsgFormat, ...) \
do \
{ \
Logger &logger = Logger::instance(); \
logger.setLogLevel(ERROR); \
char buf[1024] = {0}; \
snprintf(buf, 1024, logmsgFormat, ##__VA_ARGS__); \
logger.log(buf); \
} while(0)

#define LOG_FATAL(logmsgFormat, ...) \
do \
{ \
Logger &logger = Logger::instance(); \
logger.setLogLevel(FATAL); \
char buf[1024] = {0}; \
snprintf(buf, 1024, logmsgFormat, ##__VA_ARGS__); \
logger.log(buf); \
exit(-1); \
} while(0)

#ifdef MUDEBUG
#define LOG_DEBUG(logmsgFormat, ...) \
do \
{ \
Logger &logger = Logger::instance(); \
logger.setLogLevel(DEBUG); \
char buf[1024] = {0}; \
snprintf(buf, 1024, logmsgFormat, ##__VA_ARGS__); \
logger.log(buf); \
} while(0)
#else
#define LOG_DEBUG(logmsgFormat, ...)
#endif

// 定义日志的级别 INFO ERROR FATAL DEBUG
enum LogLevel
{
INFO, // 普通信息
ERROR, // 错误信息
FATAL, // core信息
DEBUG, // 调试信息
};

// 输出一个日志类
class Logger : noncopyable
{
public:
// 获取日志唯一的实例对象
static Logger& instance();
// 设置日志级别
void setLogLevel(int level);
// 写日志
void log(std::string msg);
private:
int logLevel_;
};

Logger.cc

#include "Logger.h"
#include "Timestamp.h"

#include <iostream>

// 获取日志唯一的实例对象
Logger& Logger::instance()
{
static Logger logger;
return logger;
}

// 设置日志级别
void Logger::setLogLevel(int level)
{
logLevel_ = level;
}

// 写日志 [级别信息] time : msg
void Logger::log(std::string msg)
{
switch (logLevel_)
{
case INFO:
std::cout << "[INFO]";
break;
case ERROR:
std::cout << "[ERROR]";
break;
case FATAL:
std::cout << "[FATAL]";
break;
case DEBUG:
std::cout << "[DEBUG]";
break;
default:
break;
}

// 打印时间和msg
std::cout << Timestamp::now().toString() << " : " << msg << std::endl;
}

3.Timestamp时间戳类

主要功能为提供时间的输出

explicit 是防止隐式类型转化,避免赋值的是时候进行隐式类型转化。

Timestamp.h

#pragma once

#include <iostream>
#include <string>

// 时间类
class Timestamp
{
public:
Timestamp();
explicit Timestamp(int64_t microSecondsSinceEpoch);
static Timestamp now();
std::string toString() const;
private:
int64_t microSecondsSinceEpoch_;
};

Timestamp.cc

#include "Timestamp.h"

#include <time.h>

Timestamp::Timestamp():microSecondsSinceEpoch_(0) {}

Timestamp::Timestamp(int64_t microSecondsSinceEpoch)
: microSecondsSinceEpoch_(microSecondsSinceEpoch)
{}

Timestamp Timestamp::now()
{
return Timestamp(time(NULL));
}

std::string Timestamp::toString() const
{
char buf[128] = {0};
tm *tm_time = localtime(&microSecondsSinceEpoch_);
snprintf(buf, 128, "%4d/%02d/%02d %02d:%02d:%02d",
tm_time->tm_year + 1900,
tm_time->tm_mon + 1,
tm_time->tm_mday,
tm_time->tm_hour,
tm_time->tm_min,
tm_time->tm_sec);
return buf;
}

// #include <iostream>
// int main()
// {
// std::cout << Timestamp::now().toString() << std::endl;
// return 0;
// }

4.InetAddress类

封装socket地址类型

InetAddress.h

#pragma once

#include <arpa/inet.h>
#include <netinet/in.h>
#include <string>

// 封装socket地址类型
class InetAddress
{
public:
explicit InetAddress(uint16_t port = 0, std::string ip = "127.0.0.1");
explicit InetAddress(const sockaddr_in &addr)
: addr_(addr)
{}

std::string toIp() const;
std::string toIpPort() const;
uint16_t toPort() const;

const sockaddr_in* getSockAddr() const {return &addr_;}
void setSockAddr(const sockaddr_in &addr) { addr_ = addr; }
private:
sockaddr_in addr_;
};

InetAddress.cc

#include "InetAddress.h"

#include <strings.h>
#include <string.h>

InetAddress::InetAddress(uint16_t port, std::string ip)
{
bzero(&addr_, sizeof addr_);
addr_.sin_family = AF_INET;
addr_.sin_port = htons(port);
addr_.sin_addr.s_addr = inet_addr(ip.c_str());
}

std::string InetAddress::toIp() const
{
// addr_
char buf[64] = {0};
::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);
return buf;
}

std::string InetAddress::toIpPort() const
{
// ip:port
char buf[64] = {0};
::inet_ntop(AF_INET, &addr_.sin_addr, buf, sizeof buf);
size_t end = strlen(buf);
uint16_t port = ntohs(addr_.sin_port);
sprintf(buf+end, ":%u", port);
return buf;
}

uint16_t InetAddress::toPort() const
{
return ntohs(addr_.sin_port);
}

// #include <iostream>
// int main()
// {
// InetAddress addr(8080);
// std::cout << addr.toIpPort() << std::endl;

// return 0;
// }

5.Channel类

Channel类是为了对读写事件和对应fd的封装

Channel.h

using定义别名 == typedef 重定义一个类型

tie 函数: 用弱智能指针判断是否channel被remove掉。防止调用HandlerEvent的时候Channel已经释放了,这个tie函数会在TcpConnection建立成功的回调函数里面被调用。

#pragma once

#include "noncopyable.h"
#include "Timestamp.h"

#include <functional>
#include <memory>

class EventLoop;

/**
* 理清楚 EventLoop、Channel、Poller之间的关系 《= Reactor模型上对应 Demultiplex
* Channel 理解为通道,封装了sockfd和其感兴趣的event,如EPOLLIN、EPOLLOUT事件
* 还绑定了poller返回的具体事件
*/
class Channel : noncopyable
{
public:
using EventCallback = std::function<void()>;
using ReadEventCallback = std::function<void(Timestamp)>;

Channel(EventLoop *loop, int fd);
~Channel();

// fd得到poller通知以后,处理事件的
void handleEvent(Timestamp receiveTime);

// 设置回调函数对象
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }

// 防止当channel被手动remove掉,channel还在执行回调操作
void tie(const std::shared_ptr<void>&);

int fd() const { return fd_; }
int events() const { return events_; }
int set_revents(int revt) { revents_ = revt; }

// 设置fd相应的事件状态
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }

// 返回fd当前的事件状态
bool isNoneEvent() const { return events_ == kNoneEvent; }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }

int index() { return index_; }
void set_index(int idx) { index_ = idx; }

// one loop per thread
EventLoop* ownerLoop() { return loop_; }
void remove();
private:

void update();
void handleEventWithGuard(Timestamp receiveTime);

static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;

EventLoop *loop_; // 事件循环
const int fd_; // fd, Poller监听的对象
int events_; // 注册fd感兴趣的事件
int revents_; // poller返回的具体发生的事件
int index_;
//用弱智能指针判断是否channel被remove掉
std::weak_ptr<void> tie_;
bool tied_;

// 因为channel通道里面能够获知fd最终发生的具体的事件revents,所以它负责调用具体事件的回调操作
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};


Channel.cc

tied_ 是在 tie 方法设置之后 才是true,tie方法又是上层TcpConnection调用连接建立成功的方法connectEstablished来进行建立成功的。说明当连接建立好的时候,这个时候如果TcpConnection还存在,表明连接状态是良好的,此时我们调用EPollPoller中handlerEvent的时候能保证执行正确。 如果TcpConnection因为某种缘故断开了,这个时候处理这个handlerEvent事件已经没有意义,因为也发送不回去了,此时上层的shared_ptr已经跟着TcpConnection一起消失,大佬的编码是就不处理了。 这里大佬的编码中采用不处理的方式。因为Channel已经跟着TcpConnection一同释放了。所以这里没有else分支。 handlerEventWithGuard 则是对回调对应的具体的事件。

//通过提升为强智能指针看资源是否被释放
std::shared_ptr<void> guard = tie_.lock();
#include "Channel.h"
#include "EventLoop.h"
#include "Logger.h"

#include <sys/epoll.h>

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;

// EventLoop: ChannelList Poller
Channel::Channel(EventLoop *loop, int fd)
: loop_(loop), fd_(fd), events_(0), revents_(0), index_(-1), tied_(false)
{
}

Channel::~Channel()
{
}

// channel的tie方法什么时候调用过?一个TcpConnection新连接创建的时候 TcpConnection => Channel
//防止调用HandlerEvent的时候Channel已经释放了,这个tie函数会在TcpConnection建立成功的回调函数里面被调用。
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}

/**
* 当改变channel所表示fd的events事件后,update负责在poller里面更改fd相应的事件epoll_ctl
* EventLoop => ChannelList Poller
*/
void Channel::update()
{
// 通过channel所属的EventLoop,调用poller的相应方法,注册fd的events事件
loop_->updateChannel(this);
}

// 在channel所属的EventLoop中, 把当前的channel删除掉
void Channel::remove()
{
loop_->removeChannel(this);
}

// fd得到poller通知以后,处理事件的
void Channel::handleEvent(Timestamp receiveTime)
{
if (tied_)
{
std::shared_ptr<void> guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}

// 根据poller通知的channel发生的具体事件, 由channel负责调用具体的回调操作
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
LOG_INFO("channel handleEvent revents:%d\n", revents_);

if ((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN))
{
if (closeCallback_)
{
closeCallback_();
}
}

if (revents_ & EPOLLERR)
{
if (errorCallback_)
{
errorCallback_();
}
}

if (revents_ & (EPOLLIN | EPOLLPRI))
{
if (readCallback_)
{
readCallback_(receiveTime);
}
}

if (revents_ & EPOLLOUT)
{
if (writeCallback_)
{
writeCallback_();
}
}
}

6.Poller抽象

// Poller是多路事件分发器的核心IO复用模块

Poller.h

#pragma once

#include "noncopyable.h"
#include "Timestamp.h"

#include <vector>
#include <unordered_map>

class Channel;
class EventLoop;

// muduo库中多路事件分发器的核心IO复用模块
class Poller : noncopyable
{
public:
using ChannelList = std::vector<Channel*>;

Poller(EventLoop *loop);
virtual ~Poller() = default;

// 给所有IO复用保留统一的接口
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
virtual void updateChannel(Channel *channel) = 0;
virtual void removeChannel(Channel *channel) = 0;

// 判断参数channel是否在当前Poller当中
bool hasChannel(Channel *channel) const;

// EventLoop可以通过该接口获取默认的IO复用的具体实现
static Poller* newDefaultPoller(EventLoop *loop);
protected:
// map的key:sockfd value:sockfd所属的channel通道类型
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap channels_;
private:
EventLoop *ownerLoop_; // 定义Poller所属的事件循环EventLoop
};

Poller.cc


#include "Poller.h"
#include "Channel.h"

Poller::Poller(EventLoop *loop)
: ownerLoop_(loop)
{
}

bool Poller::hasChannel(Channel *channel) const
{
auto it = channels_.find(channel->fd());
return it != channels_.end() && it->second == channel;
}

为了不让基类文件中调用派生类实例对象,将此函数单独写一个文件。

//getenv是获取当前的环境变量
::getenv("MUDUO_USE_POLL")

DefaultPoller.cc

#include "Poller.h"
#include "EPollPoller.h"

#include <stdlib.h>

Poller* Poller::newDefaultPoller(EventLoop *loop)
{
if (::getenv("MUDUO_USE_POLL"))
{
return nullptr; // 生成poll的实例
}
else
{
return new EPollPoller(loop); // 生成epoll的实例
}
}

7.EpollPoller 类

Poller的实现,用于Channel事件的注册修改删除以及监听。使用的原型函数epoll

virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0; //epoll_wait()
virtual void updateChannel(Channel *channel) = 0; //epoll_ctl() 添加事件/更改事件
virtual void removeChannel(Channel *channel) = 0; //epoll_ctl() 删除 并从事件 并从Channel_map中移除

EPollPoller.h

#pragma once

#include "Poller.h"
#include "Timestamp.h"

#include <vector>
#include <sys/epoll.h>

class Channel;

/**
* epoll的使用
* epoll_create
* epoll_ctl add/mod/del
* epoll_wait
*/
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop *loop);
~EPollPoller() override;

// 重写基类Poller的抽象方法
Timestamp poll(int timeoutMs, ChannelList *activeChannels) override;
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;
private:
static const int kInitEventListSize = 16;

// 填写活跃的连接
void fillActiveChannels(int numEvents, ChannelList *activeChannels) const;
// 更新channel通道
void update(int operation, Channel *channel);

using EventList = std::vector<epoll_event>;

int epollfd_;
EventList events_;
};

EPollPoller.cc

#include "EPollPoller.h"
#include "Logger.h"
#include "Channel.h"

#include <errno.h>
#include <unistd.h>
#include <strings.h>

// channel未添加到poller中
const int kNew = -1; // channel的成员index_ = -1
// channel已添加到poller中
const int kAdded = 1;
// channel从poller中删除
const int kDeleted = 2;

EPollPoller::EPollPoller(EventLoop *loop)
: Poller(loop)
, epollfd_(::epoll_create1(EPOLL_CLOEXEC))
, events_(kInitEventListSize) // vector<epoll_event>
{
if (epollfd_ < 0)
{
LOG_FATAL("epoll_create error:%d \n", errno);
}
}

EPollPoller::~EPollPoller()
{
::close(epollfd_);
}

Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels)
{
// 实际上应该用LOG_DEBUG输出日志更为合理
LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, channels_.size());

int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);
int saveErrno = errno;
Timestamp now(Timestamp::now());

if (numEvents > 0)
{
LOG_INFO("%d events happened \n", numEvents);
fillActiveChannels(numEvents, activeChannels);
if (numEvents == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if (numEvents == 0)
{
LOG_DEBUG("%s timeout! \n", __FUNCTION__);
}
else
{
if (saveErrno != EINTR)
{
errno = saveErrno;
LOG_ERROR("EPollPoller::poll() err!");
}
}
return now;
}

// channel update remove => EventLoop updateChannel removeChannel => Poller updateChannel removeChannel
/**
* EventLoop => poller.poll
* ChannelList Poller
* ChannelMap <fd, channel*> epollfd
*/
void EPollPoller::updateChannel(Channel *channel)
{
const int index = channel->index();
LOG_INFO("func=%s => fd=%d events=%d index=%d \n", __FUNCTION__, channel->fd(), channel->events(), index);

if (index == kNew || index == kDeleted)
{
if (index == kNew)
{
int fd = channel->fd();
channels_[fd] = channel;
}

channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else // channel已经在poller上注册过了
{
int fd = channel->fd();
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}

// 从poller中删除channel
void EPollPoller::removeChannel(Channel *channel)
{
int fd = channel->fd();
channels_.erase(fd);

LOG_INFO("func=%s => fd=%d\n", __FUNCTION__, fd);

int index = channel->index();
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}

// 填写活跃的连接
void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const
{
for (int i=0; i < numEvents; ++i)
{
Channel *channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel); // EventLoop就拿到了它的poller给它返回的所有发生事件的channel列表了
}
}

// 更新channel通道 epoll_ctl add/mod/del
void EPollPoller::update(int operation, Channel *channel)
{
epoll_event event;
bzero(&event, sizeof event);

int fd = channel->fd();

event.events = channel->events();
event.data.fd = fd;
event.data.ptr = channel;

if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_ERROR("epoll_ctl del error:%d\n", errno);
}
else
{
LOG_FATAL("epoll_ctl add/mod error:%d\n", errno);
}
}
}

8.CurrentThread类

此类用于获取当前线程的id:Tid

CurrentThread.h

#pragma once

#include <unistd.h>
#include <sys/syscall.h>

namespace CurrentThread
{
//__thread 线程全局变量,每个线程都独有一份
extern __thread int t_cachedTid;

void cacheTid();

inline int tid()
{
if (__builtin_expect(t_cachedTid == 0, 0))
{
cacheTid();
}
return t_cachedTid;
}
}

CurrentThread.cc

#include "CurrentThread.h"

namespace CurrentThread
{
__thread int t_cachedTid = 0;

void cacheTid()
{
if (t_cachedTid == 0)
{
// 通过linux系统调用,获取当前线程的tid值
t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
}
}
}

9.EventLoop类

EventLoop.h

#pragma once

#include <functional>
#include <vector>
#include <atomic>
#include <memory>
#include <mutex>

#include "noncopyable.h"
#include "Timestamp.h"
#include "CurrentThread.h"

class Channel;
class Poller;

// 时间循环类 主要包含了两个大模块 Channel Poller(epoll的抽象)
class EventLoop : noncopyable
{
public:
using Functor = std::function<void()>;

EventLoop();
~EventLoop();

// 开启事件循环
void loop();
// 退出事件循环
void quit();

Timestamp pollReturnTime() const { return pollReturnTime_; }

// 在当前loop中执行cb
void runInLoop(Functor cb);
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void queueInLoop(Functor cb);

// 用来唤醒loop所在的线程的
void wakeup();

// EventLoop的方法 =》 Poller的方法
void updateChannel(Channel *channel);
void removeChannel(Channel *channel);
bool hasChannel(Channel *channel);

// 判断EventLoop对象是否在自己的线程里面
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
void handleRead(); // wake up
void doPendingFunctors(); // 执行回调

using ChannelList = std::vector<Channel*>;

std::atomic_bool looping_; // 原子操作,通过CAS实现的
std::atomic_bool quit_; // 标识退出loop循环

const pid_t threadId_; // 记录当前loop所在线程的id

Timestamp pollReturnTime_; // poller返回发生事件的channels的时间点
std::unique_ptr<Poller> poller_;

int wakeupFd_; // 主要作用,当mainLoop获取一个新用户的channel,通过轮询算法选择一个subloop,通过该成员唤醒subloop处理channel
std::unique_ptr<Channel> wakeupChannel_;

ChannelList activeChannels_;

std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作
std::vector<Functor> pendingFunctors_; // 存储loop需要执行的所有的回调操作
std::mutex mutex_; // 互斥锁,用来保护上面vector容器的线程安全操作
};

EventLoop.cc

#include "EventLoop.h"
#include "Logger.h"
#include "Poller.h"
#include "Channel.h"

#include <sys/eventfd.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <memory>

// 防止一个线程创建多个EventLoop thread_local
__thread EventLoop *t_loopInThisThread = nullptr;

// 定义默认的Poller IO复用接口的超时时间
const int kPollTimeMs = 10000;

// 创建wakeupfd,用来notify唤醒subReactor处理新来的channel
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
LOG_FATAL("eventfd error:%d \n", errno);
}
return evtfd;
}

EventLoop::EventLoop()
: looping_(false)
, quit_(false)
, callingPendingFunctors_(false)
, threadId_(CurrentThread::tid())
, poller_(Poller::newDefaultPoller(this))
, wakeupFd_(createEventfd())
, wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if (t_loopInThisThread)
{
LOG_FATAL("Another EventLoop %p exists in this thread %d \n", t_loopInThisThread, threadId_);
}
else
{
t_loopInThisThread = this;
}

// 设置wakeupfd的事件类型以及发生事件后的回调操作
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
// 每一个eventloop都将监听wakeupchannel的EPOLLIN读事件了
wakeupChannel_->enableReading();
}

EventLoop::~EventLoop()
{
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}

// 开启事件循环
void EventLoop::loop()
{
looping_ = true;
quit_ = false;

LOG_INFO("EventLoop %p start looping \n", this);

while(!quit_)
{
activeChannels_.clear();
// 监听两类fd 一种是client的fd,一种wakeupfd
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel *channel : activeChannels_)
{
// Poller监听哪些channel发生事件了,然后上报给EventLoop,通知channel处理相应的事件
channel->handleEvent(pollReturnTime_);
}
// 执行当前EventLoop事件循环需要处理的回调操作
/**
* IO线程 mainLoop accept fd《=channel subloop
* mainLoop 事先注册一个回调cb(需要subloop来执行) wakeup subloop后,执行下面的方法,执行之前mainloop注册的cb操作
*/
doPendingFunctors();
}

LOG_INFO("EventLoop %p stop looping. \n", this);
looping_ = false;
}

// 退出事件循环 1.loop在自己的线程中调用quit 2.在非loop的线程中,调用loop的quit
/**
* mainLoop
*
* no ==================== 生产者-消费者的线程安全的队列
*
* subLoop1 subLoop2 subLoop3
*/
void EventLoop::quit()
{
quit_ = true;

// 如果是在其它线程中,调用的quit 在一个subloop(woker)中,调用了mainLoop(IO)的quit
if (!isInLoopThread())
{
wakeup();
}
}

// 在当前loop中执行cb
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread()) // 在当前的loop线程中,执行cb
{
cb();
}
else // 在非当前loop线程中执行cb , 就需要唤醒loop所在线程,执行cb
{
queueInLoop(cb);
}
}
// 把cb放入队列中,唤醒loop所在的线程,执行cb
void EventLoop::queueInLoop(Functor cb)
{
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.emplace_back(cb);
}

// 唤醒相应的,需要执行上面回调操作的loop的线程了
// || callingPendingFunctors_的意思是:当前loop正在执行回调,但是loop又有了新的回调
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup(); // 唤醒loop所在线程
}
}

void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::handleRead() reads %lu bytes instead of 8", n);
}
}

// 用来唤醒loop所在的线程的 向wakeupfd_写一个数据,wakeupChannel就发生读事件,当前loop线程就会被唤醒
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}
}

// EventLoop的方法 =》 Poller的方法
void EventLoop::updateChannel(Channel *channel)
{
poller_->updateChannel(channel);
}

void EventLoop::removeChannel(Channel *channel)
{
poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
return poller_->hasChannel(channel);
}

void EventLoop::doPendingFunctors() // 执行回调
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;

{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}

for (const Functor &functor : functors)
{
functor(); // 执行当前loop需要执行的回调操作
}

callingPendingFunctors_ = false;
}

10.Thread类

Thread.h

#pragma once

#include "noncopyable.h"

#include <functional>
#include <thread>
#include <memory>
#include <unistd.h>
#include <string>
#include <atomic>

class Thread : noncopyable
{
public:
using ThreadFunc = std::function<void()>;

explicit Thread(ThreadFunc, const std::string &name = std::string());
~Thread();

void start();
void join();

bool started() const { return started_; }
pid_t tid() const { return tid_; }
const std::string& name() const { return name_; }

static int numCreated() { return numCreated_; }
private:
void setDefaultName();

bool started_;
bool joined_;
std::shared_ptr<std::thread> thread_;
pid_t tid_;
ThreadFunc func_;
std::string name_;
static std::atomic_int numCreated_;
};

Thread.cc

#include "Thread.h"
#include "CurrentThread.h"

#include <semaphore.h>

std::atomic_int Thread::numCreated_(0);

Thread::Thread(ThreadFunc func, const std::string &name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func))
, name_(name)
{
setDefaultName();
}

Thread::~Thread()
{
if (started_ && !joined_)
{
thread_->detach(); // thread类提供的设置分离线程的方法
}
}

void Thread::start() // 一个Thread对象,记录的就是一个新线程的详细信息
{
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);

// 开启线程
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
// 获取线程的tid值
tid_ = CurrentThread::tid();
sem_post(&sem);
// 开启一个新线程,专门执行该线程函数
func_();
}));

// 这里必须等待获取上面新创建的线程的tid值
sem_wait(&sem);
}

void Thread::join()
{
joined_ = true;
thread_->join();
}

void Thread::setDefaultName()
{
int num = ++numCreated_;
if (name_.empty())
{
char buf[32] = {0};
snprintf(buf, sizeof buf, "Thread%d", num);
name_ = buf;
}
}

11.EventLoopThread类

EventLoopThread.h

#pragma once

#include "noncopyable.h"
#include "Thread.h"

#include <functional>
#include <mutex>
#include <condition_variable>
#include <string>

class EventLoop;

class EventLoopThread : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

EventLoopThread(const ThreadInitCallback &cb = ThreadInitCallback(),
const std::string &name = std::string());
~EventLoopThread();

EventLoop* startLoop();
private:
void threadFunc();

EventLoop *loop_;
bool exiting_;
Thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
ThreadInitCallback callback_;
};

EventLoopThread.cc

#include "EventLoopThread.h"
#include "EventLoop.h"


EventLoopThread::EventLoopThread(const ThreadInitCallback &cb,
const std::string &name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this), name)
, mutex_()
, cond_()
, callback_(cb)
{

}

EventLoopThread::~EventLoopThread()
{
exiting_ = true;
if (loop_ != nullptr)
{
loop_->quit();
thread_.join();
}
}

EventLoop* EventLoopThread::startLoop()
{
thread_.start(); // 启动底层的新线程

EventLoop *loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while ( loop_ == nullptr )
{
cond_.wait(lock);
}
loop = loop_;
}
return loop;
}

// 下面这个方法,实在单独的新线程里面运行的
void EventLoopThread::threadFunc()
{
EventLoop loop; // 创建一个独立的eventloop,和上面的线程是一一对应的,one loop per thread

if (callback_)
{
callback_(&loop);
}

{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one();
}

loop.loop(); // EventLoop loop => Poller.poll
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}

12.EventLoopThreadPool类

EventLoopThreadPool.h

#pragma once
#include "noncopyable.h"

#include <functional>
#include <string>
#include <vector>
#include <memory>

class EventLoop;
class EventLoopThread;

class EventLoopThreadPool : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg);
~EventLoopThreadPool();

void setThreadNum(int numThreads) { numThreads_ = numThreads; }

void start(const ThreadInitCallback &cb = ThreadInitCallback());

// 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* getNextLoop();

std::vector<EventLoop*> getAllLoops();

bool started() const { return started_; }
const std::string name() const { return name_; }
private:

EventLoop *baseLoop_; // EventLoop loop;
std::string name_;
bool started_;
int numThreads_;
int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
};

EventLoopThreadPool.cc

#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"

#include <memory>

EventLoopThreadPool::EventLoopThreadPool(EventLoop *baseLoop, const std::string &nameArg)
: baseLoop_(baseLoop)
, name_(nameArg)
, started_(false)
, numThreads_(0)
, next_(0)
{}

EventLoopThreadPool::~EventLoopThreadPool()
{}

void EventLoopThreadPool::start(const ThreadInitCallback &cb)
{
started_ = true;

for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread *t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop()); // 底层创建线程,绑定一个新的EventLoop,并返回该loop的地址
}

// 整个服务端只有一个线程,运行着baseloop
if (numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}

// 如果工作在多线程中,baseLoop_默认以轮询的方式分配channel给subloop
EventLoop* EventLoopThreadPool::getNextLoop()
{
EventLoop *loop = baseLoop_;

if (!loops_.empty()) // 通过轮询获取下一个处理事件的loop
{
loop = loops_[next_];
++next_;
if (next_ >= loops_.size())
{
next_ = 0;
}
}

return loop;
}

std::vector<EventLoop*> EventLoopThreadPool::getAllLoops()
{
if (loops_.empty())
{
return std::vector<EventLoop*>(1, baseLoop_);
}
else
{
loops_;
}
}

13.Socket类

Socket.h

#pragma once

#include "noncopyable.h"

class InetAddress;

// 封装socket fd
class Socket : noncopyable
{
public:
explicit Socket(int sockfd)
: sockfd_(sockfd)
{}

~Socket();

int fd() const { return sockfd_; }
void bindAddress(const InetAddress &localaddr);
void listen();
int accept(InetAddress *peeraddr);

void shutdownWrite();

void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);
private:
const int sockfd_;
};

Socket.cc

#include "Socket.h"
#include "Logger.h"
#include "InetAddress.h"

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>

Socket::~Socket()
{
close(sockfd_);
}

void Socket::bindAddress(const InetAddress &localaddr)
{
if (0 != ::bind(sockfd_, (sockaddr*)localaddr.getSockAddr(), sizeof(sockaddr_in)))
{
LOG_FATAL("bind sockfd:%d fail \n", sockfd_);
}
}

void Socket::listen()
{
if (0 != ::listen(sockfd_, 1024))
{
LOG_FATAL("listen sockfd:%d fail \n", sockfd_);
}
}

int Socket::accept(InetAddress *peeraddr)
{
/**
* 1. accept函数的参数不合法
* 2. 对返回的connfd没有设置非阻塞
* Reactor模型 one loop per thread
* poller + non-blocking IO
*/
sockaddr_in addr;
socklen_t len = sizeof addr;
bzero(&addr, sizeof addr);
int connfd = ::accept4(sockfd_, (sockaddr*)&addr, &len, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (connfd >= 0)
{
peeraddr->setSockAddr(addr);
}
return connfd;
}

void Socket::shutdownWrite()
{
if (::shutdown(sockfd_, SHUT_WR) < 0)
{
LOG_ERROR("shutdownWrite error");
}
}

void Socket::setTcpNoDelay(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof optval);
}

void Socket::setReuseAddr(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
}

void Socket::setReusePort(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_REUSEPORT, &optval, sizeof optval);
}

void Socket::setKeepAlive(bool on)
{
int optval = on ? 1 : 0;
::setsockopt(sockfd_, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof optval);
}

14.Acceptor类

Acceptor.h

#pragma once
#include "noncopyable.h"
#include "Socket.h"
#include "Channel.h"

#include <functional>

class EventLoop;
class InetAddress;

class Acceptor : noncopyable
{
public:
using NewConnectionCallback = std::function<void(int sockfd, const InetAddress&)>;
Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport);
~Acceptor();

void setNewConnectionCallback(const NewConnectionCallback &cb)
{
newConnectionCallback_ = cb;
}

bool listenning() const { return listenning_; }
void listen();
private:
void handleRead();

EventLoop *loop_; // Acceptor用的就是用户定义的那个baseLoop,也称作mainLoop
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listenning_;
};

Acceptor.cc

#include "Acceptor.h"
#include "Logger.h"
#include "InetAddress.h"

#include <sys/types.h>
#include <sys/socket.h>
#include <errno.h>
#include <unistd.h>


static int createNonblocking()
{
int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (sockfd < 0)
{
LOG_FATAL("%s:%s:%d listen socket create err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
}
}

Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop)
, acceptSocket_(createNonblocking()) // socket
, acceptChannel_(loop, acceptSocket_.fd())
, listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr); // bind
// TcpServer::start() Acceptor.listen 有新用户的连接,要执行一个回调(connfd=》channel=》subloop)
// baseLoop => acceptChannel_(listenfd) =>
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
}

void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_ => Poller
}

// listenfd有事件发生了,就是有新用户连接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop,唤醒,分发当前的新客户端的Channel
}
else
{
::close(connfd);
}
}
else
{
LOG_ERROR("%s:%s:%d accept err:%d \n", __FILE__, __FUNCTION__, __LINE__, errno);
if (errno == EMFILE)
{
LOG_ERROR("%s:%s:%d sockfd reached limit! \n", __FILE__, __FUNCTION__, __LINE__);
}
}
}

15.Buffer类

Buffer.h

#pragma once

#include <vector>
#include <string>
#include <algorithm>

// 网络库底层的缓冲器类型定义
class Buffer
{
public:
static const size_t kCheapPrepend = 8;
static const size_t kInitialSize = 1024;

explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize)
, readerIndex_(kCheapPrepend)
, writerIndex_(kCheapPrepend)
{}

size_t readableBytes() const
{
return writerIndex_ - readerIndex_;
}

size_t writableBytes() const
{
return buffer_.size() - writerIndex_;
}

size_t prependableBytes() const
{
return readerIndex_;
}

// 返回缓冲区中可读数据的起始地址
const char* peek() const
{
return begin() + readerIndex_;
}

// onMessage string <- Buffer
void retrieve(size_t len)
{
if (len < readableBytes())
{
readerIndex_ += len; // 应用只读取了刻度缓冲区数据的一部分,就是len,还剩下readerIndex_ += len -> writerIndex_
}
else // len == readableBytes()
{
retrieveAll();
}
}

void retrieveAll()
{
readerIndex_ = writerIndex_ = kCheapPrepend;
}

// 把onMessage函数上报的Buffer数据,转成string类型的数据返回
std::string retrieveAllAsString()
{
return retrieveAsString(readableBytes()); // 应用可读取数据的长度
}

std::string retrieveAsString(size_t len)
{
std::string result(peek(), len);
retrieve(len); // 上面一句把缓冲区中可读的数据,已经读取出来,这里肯定要对缓冲区进行复位操作
return result;
}

// buffer_.size() - writerIndex_ len
void ensureWriteableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len); // 扩容函数
}
}

// 把[data, data+len]内存上的数据,添加到writable缓冲区当中
void append(const char *data, size_t len)
{
ensureWriteableBytes(len);
std::copy(data, data+len, beginWrite());
writerIndex_ += len;
}

char* beginWrite()
{
return begin() + writerIndex_;
}

const char* beginWrite() const
{
return begin() + writerIndex_;
}

// 从fd上读取数据
ssize_t readFd(int fd, int* saveErrno);
// 通过fd发送数据
ssize_t writeFd(int fd, int* saveErrno);
private:
char* begin()
{
// it.operator*()
return &*buffer_.begin(); // vector底层数组首元素的地址,也就是数组的起始地址
}
const char* begin() const
{
return &*buffer_.begin();
}
void makeSpace(size_t len)
{
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
buffer_.resize(writerIndex_ + len);
}
else
{
size_t readalbe = readableBytes();
std::copy(begin() + readerIndex_,
begin() + writerIndex_,
begin() + kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readalbe;
}
}

std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
};

Buffer.cc

#include "Buffer.h"

#include <errno.h>
#include <sys/uio.h>
#include <unistd.h>

/**
* 从fd上读取数据 Poller工作在LT模式
* Buffer缓冲区是有大小的! 但是从fd上读数据的时候,却不知道tcp数据最终的大小
*/
ssize_t Buffer::readFd(int fd, int* saveErrno)
{
char extrabuf[65536] = {0}; // 栈上的内存空间 64K

struct iovec vec[2];

const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;

vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;

const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = ::readv(fd, vec, iovcnt);
if (n < 0)
{
*saveErrno = errno;
}
else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了
{
writerIndex_ += n;
}
else // extrabuf里面也写入了数据
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); // writerIndex_开始写 n - writable大小的数据
}

return n;
}

ssize_t Buffer::writeFd(int fd, int* saveErrno)
{
ssize_t n = ::write(fd, peek(), readableBytes());
if (n < 0)
{
*saveErrno = errno;
}
return n;
}

16.TcpConnection类

Callbacks.h

#pragma once

#include <memory>
#include <functional>

class Buffer;
class TcpConnection;
class Timestamp;

using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void (const TcpConnectionPtr&)>;
using CloseCallback = std::function<void (const TcpConnectionPtr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionPtr&)>;
using MessageCallback = std::function<void (const TcpConnectionPtr&,
Buffer*,
Timestamp)>;
using HighWaterMarkCallback = std::function<void (const TcpConnectionPtr&, size_t)>;

TcpConnection.h

#pragma once

#include "noncopyable.h"
#include "InetAddress.h"
#include "Callbacks.h"
#include "Buffer.h"
#include "Timestamp.h"

#include <memory>
#include <string>
#include <atomic>

class Channel;
class EventLoop;
class Socket;

/**
* TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
* =》 TcpConnection 设置回调 =》 Channel =》 Poller =》 Channel的回调操作
*
*/
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection>
{
public:
TcpConnection(EventLoop *loop,
const std::string &name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
~TcpConnection();

EventLoop* getLoop() const { return loop_; }
const std::string& name() const { return name_; }
const InetAddress& localAddress() const { return localAddr_; }
const InetAddress& peerAddress() const { return peerAddr_; }

bool connected() const { return state_ == kConnected; }

// 发送数据
void send(const std::string &buf);
// 关闭连接
void shutdown();

void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }

void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }

void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }

void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

void setCloseCallback(const CloseCallback& cb)
{ closeCallback_ = cb; }

// 连接建立
void connectEstablished();
// 连接销毁
void connectDestroyed();
private:
enum StateE {kDisconnected, kConnecting, kConnected, kDisconnecting};
void setState(StateE state) { state_ = state; }

void handleRead(Timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();

void sendInLoop(const void* message, size_t len);
void shutdownInLoop();

EventLoop *loop_; // 这里绝对不是baseLoop, 因为TcpConnection都是在subLoop里面管理的
const std::string name_;
std::atomic_int state_;
bool reading_;

// 这里和Acceptor类似 Acceptor=》mainLoop TcpConenction=》subLoop
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;

const InetAddress localAddr_;
const InetAddress peerAddr_;

ConnectionCallback connectionCallback_; // 有新连接时的回调
MessageCallback messageCallback_; // 有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
size_t highWaterMark_;

Buffer inputBuffer_; // 接收数据的缓冲区
Buffer outputBuffer_; // 发送数据的缓冲区
};

TcpConnection.cc

#include "TcpConnection.h"
#include "Logger.h"
#include "Socket.h"
#include "Channel.h"
#include "EventLoop.h"

#include <functional>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <strings.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <string>

static EventLoop* CheckLoopNotNull(EventLoop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d TcpConnection Loop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}

TcpConnection::TcpConnection(EventLoop *loop,
const std::string &nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CheckLoopNotNull(loop))
, name_(nameArg)
, state_(kConnecting)
, reading_(true)
, socket_(new Socket(sockfd))
, channel_(new Channel(loop, sockfd))
, localAddr_(localAddr)
, peerAddr_(peerAddr)
, highWaterMark_(64*1024*1024) // 64M
{
// 下面给channel设置相应的回调函数,poller给channel通知感兴趣的事件发生了,channel会回调相应的操作函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1)
);
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this)
);
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this)
);
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this)
);

LOG_INFO("TcpConnection::ctor[%s] at fd=%d\n", name_.c_str(), sockfd);
socket_->setKeepAlive(true);
}


TcpConnection::~TcpConnection()
{
LOG_INFO("TcpConnection::dtor[%s] at fd=%d state=%d \n",
name_.c_str(), channel_->fd(), (int)state_);
}

void TcpConnection::send(const std::string &buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf.c_str(), buf.size());
}
else
{
loop_->runInLoop(std::bind(
&TcpConnection::sendInLoop,
this,
buf.c_str(),
buf.size()
));
}
}
}

/**
* 发送数据 应用写的快, 而内核发送数据慢, 需要把待发送数据写入缓冲区, 而且设置了水位回调
*/
void TcpConnection::sendInLoop(const void* data, size_t len)
{
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;

// 之前调用过该connection的shutdown,不能再进行发送了
if (state_ == kDisconnected)
{
LOG_ERROR("disconnected, give up writing!");
return;
}

// 表示channel_第一次开始写数据,而且缓冲区没有待发送数据
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = ::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
// 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_ERROR("TcpConnection::sendInLoop");
if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET
{
faultError = true;
}
}
}
}

// 说明当前这一次write,并没有把数据全部发送出去,剩余的数据需要保存到缓冲区当中,然后给channel
// 注册epollout事件,poller发现tcp的发送缓冲区有空间,会通知相应的sock-channel,调用writeCallback_回调方法
// 也就是调用TcpConnection::handleWrite方法,把发送缓冲区中的数据全部发送完成
if (!faultError && remaining > 0)
{
// 目前发送缓冲区剩余的待发送数据的长度
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(
std::bind(highWaterMarkCallback_, shared_from_this(), oldLen+remaining)
);
}
outputBuffer_.append((char*)data + nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting(); // 这里一定要注册channel的写事件,否则poller不会给channel通知epollout
}
}
}

// 关闭连接
void TcpConnection::shutdown()
{
if (state_ == kConnected)
{
setState(kDisconnecting);
loop_->runInLoop(
std::bind(&TcpConnection::shutdownInLoop, this)
);
}
}

void TcpConnection::shutdownInLoop()
{
if (!channel_->isWriting()) // 说明outputBuffer中的数据已经全部发送完成
{
socket_->shutdownWrite(); // 关闭写端
}
}

// 连接建立
void TcpConnection::connectEstablished()
{
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 向poller注册channel的epollin事件

// 新连接建立,执行回调
connectionCallback_(shared_from_this());
}

// 连接销毁
void TcpConnection::connectDestroyed()
{
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll(); // 把channel的所有感兴趣的事件,从poller中del掉
connectionCallback_(shared_from_this());
}
channel_->remove(); // 把channel从poller中删除掉
}

void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
// 已建立连接的用户,有可读事件发生了,调用用户传入的回调操作onMessage
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

void TcpConnection::handleWrite()
{
if (channel_->isWriting())
{
int savedErrno = 0;
ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno);
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
// 唤醒loop_对应的thread线程,执行回调
loop_->queueInLoop(
std::bind(writeCompleteCallback_, shared_from_this())
);
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_ERROR("TcpConnection::handleWrite");
}
}
else
{
LOG_ERROR("TcpConnection fd=%d is down, no more writing \n", channel_->fd());
}
}

// poller => channel::closeCallback => TcpConnection::handleClose
void TcpConnection::handleClose()
{
LOG_INFO("TcpConnection::handleClose fd=%d state=%d \n", channel_->fd(), (int)state_);
setState(kDisconnected);
channel_->disableAll();

TcpConnectionPtr connPtr(shared_from_this());
connectionCallback_(connPtr); // 执行连接关闭的回调
closeCallback_(connPtr); // 关闭连接的回调 执行的是TcpServer::removeConnection回调方法
}

void TcpConnection::handleError()
{
int optval;
socklen_t optlen = sizeof optval;
int err = 0;
if (::getsockopt(channel_->fd(), SOL_SOCKET, SO_ERROR, &optval, &optlen) < 0)
{
err = errno;
}
else
{
err = optval;
}
LOG_ERROR("TcpConnection::handleError name:%s - SO_ERROR:%d \n", name_.c_str(), err);
}

17.TcpServer类

TcpServer.h

#pragma once

/**
* 用户使用muduo编写服务器程序
*/
#include "EventLoop.h"
#include "Acceptor.h"
#include "InetAddress.h"
#include "noncopyable.h"
#include "EventLoopThreadPool.h"
#include "Callbacks.h"
#include "TcpConnection.h"
#include "Buffer.h"

#include <functional>
#include <string>
#include <memory>
#include <atomic>
#include <unordered_map>

// 对外的服务器编程使用的类
class TcpServer : noncopyable
{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;

enum Option
{
kNoReusePort,
kReusePort,
};

TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option = kNoReusePort);
~TcpServer();

void setThreadInitcallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; }
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }

// 设置底层subloop的个数
void setThreadNum(int numThreads);

// 开启服务器监听
void start();
private:
void newConnection(int sockfd, const InetAddress &peerAddr);
void removeConnection(const TcpConnectionPtr &conn);
void removeConnectionInLoop(const TcpConnectionPtr &conn);

using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>;

EventLoop *loop_; // baseLoop 用户定义的loop

const std::string ipPort_;
const std::string name_;

std::unique_ptr<Acceptor> acceptor_; // 运行在mainLoop,任务就是监听新连接事件

std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread

ConnectionCallback connectionCallback_; // 有新连接时的回调
MessageCallback messageCallback_; // 有读写消息时的回调
WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调

ThreadInitCallback threadInitCallback_; // loop线程初始化的回调

std::atomic_int started_;

int nextConnId_;
ConnectionMap connections_; // 保存所有的连接
};

TcpServer.cc

#include "TcpServer.h"
#include "Logger.h"
#include "TcpConnection.h"

#include <strings.h>
#include <functional>

static EventLoop* CheckLoopNotNull(EventLoop *loop)
{
if (loop == nullptr)
{
LOG_FATAL("%s:%s:%d mainLoop is null! \n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}

TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 当有先用户连接时,会执行TcpServer::newConnection回调
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}

TcpServer::~TcpServer()
{
for (auto &item : connections_)
{
// 这个局部的shared_ptr智能指针对象,出右括号,可以自动释放new出来的TcpConnection对象资源了
TcpConnectionPtr conn(item.second);
item.second.reset();

// 销毁连接
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)
);
}
}

// 设置底层subloop的个数
void TcpServer::setThreadNum(int numThreads)
{
threadPool_->setThreadNum(numThreads);
}

// 开启服务器监听 loop.loop()
void TcpServer::start()
{
if (started_++ == 0) // 防止一个TcpServer对象被start多次
{
threadPool_->start(threadInitCallback_); // 启动底层的loop线程池
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}

// 有一个新的客户端的连接,acceptor会执行这个回调操作
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
// 轮询算法,选择一个subLoop,来管理channel
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;

LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());

// 通过sockfd获取其绑定的本机的ip地址和端口信息
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);

// 根据连接成功的sockfd,创建TcpConnection连接对象
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, // Socket Channel
localAddr,
peerAddr));
connections_[connName] = conn;
// 下面的回调都是用户设置给TcpServer=>TcpConnection=>Channel=>Poller=>notify channel调用回调
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);

// 设置了如何关闭连接的回调 conn->shutDown()
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);

// 直接调用TcpConnection::connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

void TcpServer::removeConnection(const TcpConnectionPtr &conn)
{
loop_->runInLoop(
std::bind(&TcpServer::removeConnectionInLoop, this, conn)
);
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr &conn)
{
LOG_INFO("TcpServer::removeConnectionInLoop [%s] - connection %s\n",
name_.c_str(), conn->name().c_str());

connections_.erase(conn->name());
EventLoop *ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn)
);
}

18.编译运行

CmakeLists.txt

cmake_minimum_required(VERSION 2.5)
project(mymuduo)

# cmake => makefile make
# mymuduo最终编译成so动态库,设置动态库的路径,放在根目录的lib文件夹下面
set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib)
# 设置调试信息 以及 启动C++11语言标准
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11 -fPIC")

# 定义参与编译的源代码文件
aux_source_directory(. SRC_LIST)
# 编译生成动态库mymuduo
add_library(mymuduo SHARED ${SRC_LIST})

autobuild.sh

#!/bin/bash

set -e

# 如果没有build目录,创建该目录
if [ ! -d `pwd`/build ]; then
mkdir `pwd`/build
fi

rm -rf `pwd`/build/*

cd `pwd`/build &&
cmake .. &&
make

# 回到项目根目录
cd ..

# 把头文件拷贝到 /usr/include/mymuduo so库拷贝到 /usr/lib PATH
if [ ! -d /usr/include/mymuduo ]; then
mkdir /usr/include/mymuduo
fi

for header in `ls *.h`
do
cp $header /usr/include/mymuduo
done

cp `pwd`/lib/libmymuduo.so /usr/lib

ldconfig

19.测试程序

#include <mymuduo/TcpServer.h>
#include <mymuduo/Logger.h>

#include <string>
#include <functional>

class EchoServer
{
public:
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注册回调函数
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);

// 设置合适的loop线程数量 loopthread
server_.setThreadNum(3);
}
void start()
{
server_.start();
}
private:
// 连接建立或者断开的回调
void onConnection(const TcpConnectionPtr &conn)
{
if (conn->connected())
{
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
}
else
{
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}
}

// 可读写事件回调
void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown(); // 写端 EPOLLHUP =》 closeCallback_
}

EventLoop *loop_;
TcpServer server_;
};

int main()
{
EventLoop loop;
InetAddress addr(8000);
EchoServer server(&loop, addr, "EchoServer-01"); // Acceptor non-blocking listenfd create bind
server.start(); // listen loopthread listenfd => acceptChannel => mainLoop =>
loop.loop(); // 启动mainLoop的底层Poller

return 0;
}

makefile

testserver :
g++ -o testserver testserver.cc -lmymuduo -lpthread -g

clean :
rm -f testserver