当前位置: 代码迷 >> 综合 >> muduo net库学习笔记8——件驱动循环线程池EventLoopThreadPool
  详细解决方案

muduo net库学习笔记8——件驱动循环线程池EventLoopThreadPool

热度:54   发布时间:2023-11-04 18:39:35.0

感谢并转载自https://blog.csdn.net/sinat_35261315/article/details/78376821

线程池的作用体现在

  • 用户启动TcpServer服务器时创建大量子线程,每个子线程创建一个EventLoop并开始执行EventLoop::loop
  • 主线程的线程池保存着创建的这些线程和EventLoop
  • Acceptor接收到客户端的连接请求后返回TcpServerTcpServer创建TcpConnection用于管理tcp连接
  • TcpServer从事件驱动线程池中取出一个EventLoop,并将这个EventLoop传给TcpConnection的构造函数
  • TcpConnection创建用于管理套接字的Channel并注册到从线程池中取出的EventLoopPoller
  • 服务器继续监听

这个池子既是一个线程池,又是一个EventLoop池,二者是等价的,一个EventLoop对应一个线程


线程池中比较复杂的地方是由主线程创建子线程,子线程创建EventLoop并执行EventLoop::loop,主线程返回创建的EventLoop给线程池并保存起来,比较绕
(主线程创子线程, 子线程创建EventLoop,主线程返回创建的EventLoop给线程池并保存起来)

EventLoopThreadPool定义

class EventLoopThreadPool : noncopyable
{
    public:typedef std::function<void(EventLoop*)> ThreadInitCallback;EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);~EventLoopThreadPool();void setThreadNum(int numThreads) {
     numThreads_ = numThreads; }void start(const ThreadInitCallback& cb = ThreadInitCallback());//开启线程池,创建线程// valid after calling start()/// round-robinEventLoop* getNextLoop();//获取一个线程(事件驱动循环),通常在创建TcpConnection时调用 /// with the same hash code, it will always return the same EventLoopEventLoop* getLoopForHash(size_t hashCode);std::vector<EventLoop*> getAllLoops();bool started() const{
     return started_; }const string& name() const{
     return name_; }private:EventLoop* baseLoop_;//主线程的事件驱动循环,TcpServer所在的事件驱动循环,创建TcpServer传入的EventLoopstring name_;bool started_;int numThreads_;//线程数int next_;//标记下次应该取出哪个线程,采用round_robinstd::vector<std::unique_ptr<EventLoopThread>> threads_;//线程池中的所有线程std::vector<EventLoop*> loops_;/* * 线程池中每个线程对应的事件驱动循环,从线程池取出线程实际上返回的是事件驱动循环* 每个事件驱动循环运行在一个线程中*/
};
  • baseLoop_是主线程所在的事件驱动循环,即TcpServer所在的那个主线程,这个事件驱动循环通常只负责监听客户端连接请求,即AcceptorChannel
  • 两个vector保存着所有子线程即每个子线程对应的EventLoop。事件驱动循环线程被封装在EventLoopThread中,EventLoopThread中使用的Thread才是真正的线程封装

线程池是由TcpServer启动的,在TcpServer::start函数中(由用户调用)

/* * 开启事件驱动循环线程池,将Acceptor的Channel添加到EventLoop中,注册到Poller上* 此时还没有调用EventLoop::loop(),所以还没有开启监听*/
void TcpServer::start()
{
    if (started_.getAndSet(1) == 0){
    /* 启动线程池 */threadPool_->start(threadInitCallback_);assert(!acceptor_->listenning());/* * Acceptor和TcpServer在同一个线程,通常会直接调用 * std::bind只能值绑定,如果传入智能指针会增加引用计数,这里传递普通指针* 因为TcpServer没有销毁,所以不用担心Accepor会销毁*/loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));}
}

TcpServer::start中调用threadPool_->start()用于启动线程池,传入的参数是创建好所有线程后调用的回调函数,也是由用户提供

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
    assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 创建一定数量的线程(事件驱动循环) */for (int i = 0; i < numThreads_; ++i){
    char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驱动循环线程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 创建新线程,返回新线程的事件驱动循环EventLoop */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){
    cb(baseLoop_);}
}
  1. 线程池所在线程是主线程,所有通过pthread_create创建的线程为子线程
  2. 创建线程首先构造EventLoopThread对象,这里保存着一个事件驱动循环loop_和线程Thread,但是事件驱动循环初始为nullThread初始时设置了回调函数,在创建完线程后执行
  3. 执行EventLoopThread::startLoop函数启动Thread创建子线程,主线程返回阻塞在loop_上因为它为null,子线程执行回调函数创建EventLoop并赋值给loops_,同时唤醒主线程
  4. 主线程返回loops_,子线程执行EventLoop::loop开始监听事件
/* * 线程池所在线程在每创建一个EventLoopThread后会调用相应对象的startLoop函数,注意主线程和子线程之分* 主线程是TcpServer所在线程,也是线程池所在线程* 子线程是由线程池通过pthread_create创建的线程,每一个子线程运行一个EventLoop::loop * * 1.主线程EventLoopThreadPool创建EventLoopThread对象* 2.主线程EventLoopThread构造函数中初始化线程类Thread并传递回调函数EventLoopThread::threadFunc* 3.主线程EventLoopThreadPool创建完EventLoopThread后,调用EventLoopThread::startLoop函数* 4.主线程EventLoopThread::startLoop函数开启线程类Thread,即调用Thread::start* 5.主线程Thread::start函数中使用pthread_create创建线程后* 子线程调用回调函数EventLoopThread::threadFunc,主线程返回到EventLoopThread::startLoop* 6.主线程EventLoopThread::startLoop由于当前事件驱动循环loop_为null(构造时初始化为null)导致wait* 7.子线程EventLoopThread::threadFunc创建EventLoop并赋值给loop_,然后唤醒阻塞在cond上的主线程* 8.主线程EventLoopThread::startLoop被唤醒后,返回loop_给EventLoopThreadPool* 9.主线程EventLoopThreadPool保存返回的loop_,存放在成员变量std::vector<EventLoop*> loops_中* 10.子线程仍然在threadFunc中,调用EventLoop::loop函数,无限循环监听*/EventLoop* EventLoopThread::startLoop()
{
    assert(!thread_.started());/* 主线程调用线程类的start函数,创建线程 */thread_.start();{
    /* 加锁,原因是loop_可能会被子线程更改 */MutexLockGuard lock(mutex_);/** 如果loop_仍然为null,说明子线程那边还没有进入threadFunc创建EventLoop,wait等待* pthread_cond_wait(pthread_cond_t&, pthread_mutex_t&);会自动解锁,然后睡眠* 等待某个线程使用pthread_cond_signal(&pthread_cond_t&);或pthread_cond_boardcast(pthread_cond_t&)* 唤醒wait的一个/全部线程* 当主线程从wait中返回后,子线程已经创建了EventLoop,主线程返回到EventLoopThreadPool中* 子线程执行EventLoop::loop函数监听事件*/while (loop_ == NULL){
    cond_.wait();}}return loop_;
}

Thread类通过调用pthread_create创建子线程

/* EventLoopThread::startLoop函数中调用,用于创建子线程 */
void Thread::start()
{
    assert(!started_);started_ = true;// FIXME: move(func_)/* 创建子线程为线程函数提供的参数,封装起来就可以实现传递多个参数 */detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);/* 创建线程,子线程调用detail::startThread,主线程继续向下执行 */if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)){
    started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{
    /* 如果线程创建成功,主线程阻塞在这里 */latch_.wait();assert(tid_ > 0);}
}

创建成功后,主线程阻塞在latch_.wait()上(条件变量),等待子线程执行threadFunc之前被唤醒
ThreadData是线程数据类,将线程函数用到的所有参数都存在这里面即可,线程函数为detail::startThread,进而调用runInThread

/* 创建线程后调用的函数,data是参数 */
void* startThread(void* obj)
{
    ThreadData* data = static_cast<ThreadData*>(obj);data->runInThread();delete data;return NULL;
}

runInLoop调用latch_->countDown,此时会唤醒主线程,主线程回到startLoop中由于loop_为null阻塞在wait上。而此时子线程正在准备调用threadFunc(在EventLoopThread创建之初将EventLoopThread::threadFunc传递给Thread,用于在创建完线程后调用)

  /* * 创建线程后间接调用的函数,用于执行EventLoopThread::threadFunc* 这个函数在EventLoopThread构造时传给Thread对象的* EventLoopThread::startLoop函数中调用Thread对象的Thread::start函数* Thread::start中创建子线程,子线程调用detail::startThread,进而调用detail::runInThread* detail::runInLoop调用EventLoopThread::threadFunc,创建EventLoop,唤醒主线程,子线程执行loop循环* 转了一大圈又回到EventLoopThread中*/void runInThread(){
    *tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;muduo::CurrentThread::t_threadName = name_.empty() ? "muduoThread" : name_.c_str();/* 给当前线程命名 */::prctl(PR_SET_NAME, muduo::CurrentThread::t_threadName);try{
    /* EventLoopThread::threadFunc() */func_();muduo::CurrentThread::t_threadName = "finished";}catch (const Exception& ex){
    muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());fprintf(stderr, "stack trace: %s\n", ex.stackTrace());abort();}catch (const std::exception& ex){
    muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "exception caught in Thread %s\n", name_.c_str());fprintf(stderr, "reason: %s\n", ex.what());abort();}catch (...){
    muduo::CurrentThread::t_threadName = "crashed";fprintf(stderr, "unknown exception caught in Thread %s\n", name_.c_str());throw; // rethrow}}

兜兜转转又回到了EventLoopThread,此时主线程阻塞在EventLoopThread::startInLoop的wait上,子线程在EventLoopThread::threadFunc中,准备创建一个EventLoop然后唤醒主线程,并开启事件循环

/* * 传递给线程的回调函数,当创建线程后,在detail::runInLoop会回调这个函数* 此函数创建一个事件驱动循环,并开启事件监听(loop)*/
void EventLoopThread::threadFunc()
{
    /* 子线程创建事件驱动循环 */EventLoop loop;if (callback_){
    callback_(&loop);}{
    /* 上锁后赋值给loop_ */MutexLockGuard lock(mutex_);loop_ = &loop;/* * pthread_cond_signal(pthread_cond_t&)唤醒一个wait的线程* 此时主线程发现loop_已经不为null,随后返回到EventLoopThreadPool中*/cond_.notify();}/* 子线程开启事件监听,进入无限循环,不返回 */loop.loop();//assert(exiting_);loop_ = NULL;
}

子线程将一直停留在loop.loop()上,主线程由于被子线程唤醒,发现loop_已经不为null,说明已经创建了一个线程,同时也在那个线程中创建了一个事件驱动循环,所以主线程返回,将创建好的事件驱动循环返回给线程池保存起来,当有新的TcpConnection被创建后取出一个用来监听tcp连接

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
    assert(!started_);baseLoop_->assertInLoopThread();started_ = true;/* 创建一定数量的线程(事件驱动循环) */for (int i = 0; i < numThreads_; ++i){
    char buf[name_.size() + 32];snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);/* EventLoopThread,事件驱动循环线程*/EventLoopThread* t = new EventLoopThread(cb, buf);threads_.push_back(std::unique_ptr<EventLoopThread>(t));/* 创建新线程,返回新线程的事件驱动循环EventLoop *//* EventLoopThread主线程返回后,将事件驱动循环保存下来,然后继续创建线程 */loops_.push_back(t->startLoop());}if (numThreads_ == 0 && cb){
    cb(baseLoop_);}
}

至此线程池的创建工作完成,每一个线程都运行着EventLoop::loop,进行EventLoop::loop -> Poller::poll -> Channel::handleEvent -> TcpConnection::handle* -> EventLoop::doPendingFunctors -> EventLoop::loop的工作。
如果提供了回调函数,在创建完成后也会执行,但通常用户不会在意线程池的创建工作,所以一般都不提供


创建完成后,线程池唯一的工作就是在新建TcpConnection时从池子中取出一个EventLoop传给TcpConnection

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
    loop_->assertInLoopThread();/* 从事件驱动线程池中取出一个线程给TcpConnection */EventLoop* ioLoop = threadPool_->getNextLoop();/* 为TcpConnection生成独一无二的名字 */char buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();/* * 根据sockfd获取tcp连接在本地的<地址,端口>* getsockname(int fd, struct sockaddr*, int *size);*/InetAddress localAddr(sockets::getLocalAddr(sockfd));// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessary/* 创建一个新的TcpConnection代表一个Tcp连接 */TcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));...
}

EventLoopThreadPool::getNextLoop函数如下,用于取出一个EventLoop。
如果线程池中没有线程,就返回主线程的EventLoop,此时只有一个EventLoop在运行,即TcpServer的那个

/* 从线程池中取出一个线程,挨着取 */
EventLoop* EventLoopThreadPool::getNextLoop()
{
    baseLoop_->assertInLoopThread();assert(started_);/* 线程池所在线程,TcpServer的主线程 */EventLoop* loop = baseLoop_;/* * 如果不为空,取出一个* 如果为空,说明线程池中没有创建线程,是单线程程序,返回主线程*/if (!loops_.empty()){
    // round-robin/* loops_保存所有的线程 */loop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){
    next_ = 0;}}return loop;
}