首先总体情况:
每个muduo网络库有一个事件驱动循环线程池 EventLoopThreadPool,线程池用在事件驱动循环上层,也就是事件驱动循环是线程池中的一个线程
- 每个TcpServer对应一个事件驱动循环线程池
- 每个线程池中有多个事件驱动线程EventLoopThread
- 每个线程运行一个Eventloop事件循环
- 每个EventLoop事件循环包含一个IO复用Poller, 一个计时器队列TimerQueue
- 每个Poller监听多个Channel, 也就对应一个TcpConnection或者监听套接字,TimeQueue其实也是一个Channel
- 在poll返回后处理激活队列中Channel的过程是同步的,也就是一个一个调用回调函数
- 每个回调函数是在EventLoop所在线程执行
- 调用回调函数的线程和事件驱动主循环所在线程是同一个,也就是同步执行回调函数
- 所有激活的Channel回调结束后,EventLoop继续让Poller监听
调用回调函数的过程中是同步的,所以如果回调函数执行时间很长,那么这个EventLoop所在线程就会等待很久之后才再次Poll
多线程体现在EventLoop的上层,即在EventLoop上层有一个线程池,线程池中每一个线程运行一个EventLoop, 也就是Reactor+线程池的设计模式
EventLoop成员变量
创建时要保存当前时间循环所在的线程,用于之后运行时判断使用EventLoop的线程是否时EventLoop所属的线程threadId_
;保存poll返回的时间,用于计算从激活到调用回调函数的延迟pollReturnTime_
;poller_
时io多路复用,timerQueue_定时器队列
下一个wakeupFd_
:用于唤醒当前线程,因为当前线程主要阻塞在poll函数上,唤醒的方法时手动激活这个wakeupChannel_, 写入几个字节让Channel变为可读, 当然这个Channel也注册到Pooll中
最后一个变量std::vector<Functor> pendingFunctors_
是一个任务容器,存放的是将要执行的回调函数,避免本来属于当前线程的回调函数被其他线程调用,应该把这个回调函数添加到属于它所属的线程,等待它属于的线程被唤醒后调用,满足线程安全
将某个对象暴露给这是非常不安全的,万一这个线程不小心析构了这个对象,而这个对象所属的那个线程正要访问这个对象(例如调用这个对象的接口),这个线程就会崩溃,因为它访问了一个本不存在的对象(已经被析构)
为了解决这个问题即事件循环不属于当前线程,就需要尽量将对这个对象的操作移到它所属的那个线程执行(这里是调用这个对象的接口)。因为每个对象都有它所属的事件驱动循环EventLoop
,这个EventLoop
通常阻塞在poll
上。可以保证的是EventLoop
阻塞的线程就是它所属的那个线程,所以调用poll的线程就是这个对象所属的线程。这就可以让poll
返回后再执行想要调用的函数(??),但是需要手动唤醒poll
,否则一直阻塞在那里会耽误函数的执行。
出现事件驱动循环不属于当前线程的例子:
1.客户端close连接,服务器端某个Channel被激活,原因为EPOLLHUP
2.Channel调用回调函数,即TcpConnection的handleClose
3.handleClose调用TcpServer为handleClose提供的回调函数removeConnection
4.此时执行的是TcpServer的removeConnection函数,
但
这就导致将TcpServer暴露给了TcpConnection所在线程
TcpServer要将这个关闭的TcpConnection从tcp map中删除,就需要调用自己的另一个函数removeConnectionInLoop
为了实现线程安全性
要让 removeConnectionInLoop
在TcpServer自己所在线程执行,
需要先把这个函数添加到队列中存起来,等到回到自己的线程在执行
runInLoop
中的queueInLoop
函数就是将这个函数存起来
当然,如果调用runInLoop所在线程和事件驱动循环线程是同一个线程,那么可以直接调用回调函数
runInLoop
和queueInLoop
代码片段1:EventLoop::runInLoop()
文件名:EventLoop.cc
代码逻辑:判断是否处于当前IO线程,是则执行这个函数,如果不是则将函数加入队列
// 在IO线程中执行某个回调函数,该函数可以跨线程调用
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread()){
// 如果是当前IO线程调用runInLoop,则同步调用cbcb();}else{
// 如果是其它线程调用runInLoop,则异步地将cb添加到队列queueInLoop(cb);}
}
这个队列就是EventLoop类的最后一个变量pendingFunctors_
,将cb放入队列后,我们还需要在必要的时候唤醒IO线程来处理,因为EventLoop通常阻塞在poll上, 所以添加到pendingFunctors_后需要手动唤醒它,不然就一直阻塞在poll,会耽误函数的执行
代码片段2:EventLoop::queueInLoop()
文件名:EventLoop.ccvoid EventLoop::queueInLoop(const Functor& cb)
{
// 把任务加入到队列可能同时被多个线程调用,需要加锁{
MutexLockGuard lock(mutex_);pendingFunctors_.push_back(cb);}//这里的大括号是语句块,把里面的变量作为临时变量处理// 必要的时候有两种情况:// 1.如果调用queueInLoop()的不是IO线程,需要唤醒// 2.如果在IO线程调用queueInLoop(),且此时正在调用pending functor,需要唤醒// 即只有在IO线程的事件回调中调用queueInLoop()才无需唤醒if (!isInLoopThread() || callingPendingFunctors_){
wakeup();}
}
关于唤醒时间??
代码片段3:EventLoop::loop()部分
文件名:EventLoop.ccwhile (!quit_){
activeChannels_.clear();pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);for (ChannelList::iterator it = activeChannels_.begin();it != activeChannels_.end(); ++it){
currentActiveChannel_ = *it;currentActiveChannel_->handleEvent(pollReturnTime_);}// 执行pending Functors_中的任务回调// 这种设计使得IO线程也能执行一些计算任务,避免了IO线程在不忙时长期阻塞在IO multiplexing调用中doPendingFunctors();}
doPendingFunctors()函数?
EventLoop::doPendingFunctors()
不是简单地在临界区依次调用Functor,而是把回调列表swap()到局部变量functors中,这样做,一方面减小了临界区的长度(不会阻塞其他线程调用queueInLoop()),另一方面避免了死锁(因为Functor可能再调用queueInLoop())。
代码片段4:EventLoop::doPendingFunctors()
文件名:EventLoop.ccvoid EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;callingPendingFunctors_ = true;// 把回调列表swap()到局部变量functors中{
MutexLockGuard lock(mutex_);functors.swap(pendingFunctors_);}// 依次执行回调列表中的函数for (size_t i = 0; i < functors.size(); ++i){
functors[i]();}callingPendingFunctors_ = false;
}
唤醒方式
传统的进程/线程间唤醒办法是用pipe或者socketpair,IO线程始终监视管道上的可读事件,在需要唤醒的时候,其他线程向管道中写一个字节,这样IO线程就从IO multiplexing阻塞调用中返回。pipe和socketpair都需要一对文件描述符,且pipe只能单向通信,socketpair可以双向通信。
muduo所采用的一种高效的进程/线程间事件通知机制:eventf
// 头文件
#include <sys/eventfd.h> // 为事件通知创建文件描述符
// 参数initval表示初始化计数器值
// 参数flags可取EFD_NONBLOCK非阻塞、EFD_CLOEXEC(设置close-on-exec属性,调用exec时会自动close)、EFD_SEMAPHORE 。。。
int eventfd(unsigned int initval, int flags);
它的高效体现在:一方面它比 pipe 少用一个 fd,节省了资源;另一方面,eventfd 的缓冲区管理也简单得多,全部buffer只有定长8 bytes,不像 pipe 那样可能有不定长的真正 buffer。
可以把这个eventfd
添加到poll
中,在需要唤醒时写入8字节数据,此时poll返回,执行回调函数,然后执行在pendingFunctors_
中的函数。???
代码片段5:EventLoop::wakeup()
文件名:EventLoop.ccvoid EventLoop::wakeup()
{
uint64_t one = 1;// 向wakupFd_中写入8字节从而唤醒,wakeupFd_即eventfd()所创建的文件描述符ssize_t n = ::write(wakeupFd_, &one, sizeof one);if (n != sizeof one){
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";}
}