当前位置: 代码迷 >> 综合 >> muduo net库学习笔记7——用于创建服务器的类TcpServer
  详细解决方案

muduo net库学习笔记7——用于创建服务器的类TcpServer

热度:27   发布时间:2024-02-10 18:28:32.0

muduo为每个EventLoop设计了runInLoopqueueInLoop函数用来将本该在其他线程执行的线程不安全函数放到它所属线程执行,从而达到线程安全。

muduo采用采用one loop per thread的设计思想,即每个线程运行一个循环,这里的循环也就是事件驱动循环EventLoop。所以,EventLoop对象的loop函数,包括间接引发的Pollerpoll函数,Channel的handleEvent函数,以及TcpConnectionhandle*函数都是在一个线程内完成的。而在整个muduo体系中,有着多个这样的EventLoop,每个EventLoop都执行着loop,poll,handleEvent,handle*这些函数。这种设计模型也被成为Reactor + 线程池

控制着这些EventLoop的,换句话说,保存着事件驱动循环线程池的,就是TcpServer类,服务器类。TcpServer类创建一个高并发的服务器,内部有一个线程池,线程池中有大量的线程,每个线程运行着一个事件驱动循环,one loop per thread

另外,TcpServer本身也是一个线程(主线程),也运行着一个EventLoop,这个事件驱动循环仅仅用来监控客户端的连接请求,即Acceptor对象的Channel的可读事件。通常如果用户添加定时器任务的话,也会由这个EventLoop监听
但是TcpServer的这个EventLoop不在线程池中,这一点要区分开,线程池中的线程只用来运行负责监控TcpConnection的EventLoop的


TcpServer的成员变量主要以EventLoopAcceptorTcpConnection, EventLoopThreadLoop为主,其它变量主要是各种用户提供的回调函数,大多都传给每一个TcpConnection对象

typedef std::map<string, TcpConnectionPtr> ConnectionMap;EventLoop* loop_;  // the acceptor loop//TcpServer所在的主线程下运行的事件驱动循环,负责监听Acceptor的Channel const string ipPort_;//服务器负责监听的本地ip和端口const string name_;//服务器名字,创建时传入std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor//Acceptor对象,负责监听客户端连接请求,运行在主线程的EventLoop中std::shared_ptr<EventLoopThreadPool> threadPool_;//事件驱动线程池,池中每个线程运行一个EventLoop ConnectionCallback connectionCallback_;//用户传入,有tcp连接到达或tcp连接关闭时调用,传给TcpConnection MessageCallback messageCallback_;//用户传入,对端发来消息时调用,传给TcpConnectionWriteCompleteCallback writeCompleteCallback_;//成功写入内核tcp缓冲区后调用,传给TcpConnectionThreadInitCallback threadInitCallback_;//线程池初始化完成后调用,传给EventLoopThreadPool,再传给每个EventLoopThreadAtomicInt32 started_;// always in loop threadint nextConnId_;//TcpConnection特有id,每增加一个TcpConnection,nextConnId_加一 ConnectionMap connections_;//所有的TcpConnection对象,智能指针
};
  • 事件驱动循环EventLoop如上所说,运行在主线程,只用来监听客户端连接请求(Acceptor),不负责监听TcpConnection
  • 事件驱动循环线程池EventLoopThreadPool用于分发线程,当新建TcpConnection时,从线程池中选出一个事件驱动循环线程负责这个TcpConnection
  • connections_std::map<string, shared_ptr<TcpConnection> >类型,在新建TcpConnection后会添加到这个map中,可以保证每个TcpConnection在添加后引用计数为1,保证生命期的正常管理。在tcp连接关闭后会从map中删除TcpConnection,使引用计数减1,为了防止引用计数为0导致没有从TcpConnection的函数中返回就已经将TcpConnection销毁,在removeConnectionInLoop中使用std::bind延长了TcpConnection的生命期,
    ????
    为了防止引用计数为0导致没有从TcpConnection的函数中返回就已经将TcpConnection销毁,在removeConnectionInLoop中使用std::bind延长了TcpConnection的生命期,

构造函数主要任务是为Acceptor设置回调函数,当有新的客户端连接时由Acceptor接收后调用这个回调函数

TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option): loop_(CHECK_NOTNULL(loop)),ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),threadPool_(new EventLoopThreadPool(loop, name_)),connectionCallback_(defaultConnectionCallback),messageCallback_(defaultMessageCallback),nextConnId_(1)
{/* * 设置回调函数,当有客户端请求时,Acceptor接收客户端请求,然后调用这里设置的回调函数* 回调函数用于创建TcpConnection连接*/acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, _1, _2));
}

回调函数newConnection用于创建一个TcpConnection,将其添加到connections_中,然后设置各种用户提供的回调函数(有数据可读时/连接关闭时/出现错误时等)

Acceptor接收客户端请求后获得客户端套接字以及客户端地址
Acceptor只负责接收客户端请求, TcpServer需要生成一个TcpConnection用于管理tcp连接

1,TcpServer内有一个EventLoopThreadPool,即事件循环线程池,池子中每个线程都是一个EventLoop
2,每个EventLoop包含一个Poller用于监听注册到这个EventLoop上的所有Channel
3,当建立起一个新的TcpConnection时,这个连接会放到线程池中的某个EventLoop中
4,TcpServer中的baseLoop只用来检测客户端的连接

TcpServer::newConnection


void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{loop_->assertInLoopThread();EventLoop* ioLoop = threadPool_->getNextLoop();//从事件驱动线程池中取出一个线程给TcpConnectionchar buf[64];snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);++nextConnId_;string connName = name_ + buf;//为TcpConnection生成独一无二的名字LOG_INFO << "TcpServer::newConnection [" << name_<< "] - new connection [" << connName<< "] from " << peerAddr.toIpPort();InetAddress localAddr(sockets::getLocalAddr(sockfd));//根据sockfd获取tcp连接在本地的<地址,端口>// FIXME poll with zero timeout to double confirm the new connection// FIXME use make_shared if necessaryTcpConnectionPtr conn(new TcpConnection(ioLoop,connName,sockfd,localAddr,peerAddr));connections_[connName] = conn;//添加到所有tcp 连接的map中,键是tcp连接独特的名字(服务器名+客户端<地址,端口>)conn->setConnectionCallback(connectionCallback_);//为tcp连接设置回调函数(由用户提供)conn->setMessageCallback(messageCallback_);conn->setWriteCompleteCallback(writeCompleteCallback_);conn->setCloseCallback(std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe/* * 关闭回调函数,由TcpServer设置,作用是将这个关闭的TcpConnection从map中删除* 当poll返回后,发现被激活的原因是EPOLLHUP,此时需要关闭tcp连接* 调用Channel的CloseCallback,进而调用TcpConnection的handleClose,进而调用removeConnection*/ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

连接建立后,调用TcpConnection连接建立成功的函数:

  1. 新建的TcpConnection所在事件循环是在事件循环线程池中的某个线程
  2. 所以TcpConnection也就属于它所在的事件驱动循环所在的那个线程
  3. 调用TcpConnection的函数时也就应该在自己所在线程调用
  4. 所以需要调用runInLoop在自己的那个事件驱动循环所在线程调用这个函数
  5. 当前线程是TcpServer的主线程,不是TcpConnection的线程,如果在这个线程直接调用会阻塞监听客户端请求
  6. 其实这里不是因为线程不安全,即使在这个线程调用也不会出现线程不安全,因为TcpConnection本就是由这个线程创建的

两个比较重要的调用:

  1. EventLoop* ioLoop = threadPool_->getNextLoop();从事件驱动线程池中取出一个事件驱动循环,用于监听新建的TcpConnection连接(每个TcpConnection都应该属于某个EventLoop)
  2. ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));,在TcpConnection所属的那个事件驱动循环(线程)中执行connectEstablished函数,用于TcpConnection的一些初始化工作。这里不是由于线程不安全,而是为了不阻塞主线程(TcpServer线程)处理客户端连接请求

round_robin算法来选择一个线程

EventLoop* ioLoop = threadPool_->getNextLoop();如下,采用round_robin算法来选择一个线程,其实就是在线程池中挨着取每一个线程,这次取第一个,下次就取第二个…取到最后一个后再从第一个开始取

/* 从线程池中取出一个线程,挨着取 */
EventLoop* EventLoopThreadPool::getNextLoop()
{baseLoop_->assertInLoopThread();assert(started_);/* 线程池所在线程,TcpServer的主线程 */EventLoop* loop = baseLoop_;/* * 如果不为空,取出一个* 如果为空,说明线程池中没有创建线程,是单线程程序,返回主线程*/if (!loops_.empty()){// round-robin/* loops_保存所有的线程 */loop = loops_[next_];++next_;if (implicit_cast<size_t>(next_) >= loops_.size()){next_ = 0;}}return loop;
}

用户程序代码使用TcpServer的例子

void onConnection(const muduo::net::TcpConnectionPtr& conn)
{...
}void onMessage(const muduo::net::TcpConnectionPtr& conn,muduo::net::Buffer* buf,muduo::Timerstamp time)
{...
}int main()
{muduo::net::EventLoop loop;//需要 一个事件驱动循环muduo::net::InetAddress listenAddr(2007);//需要创建监听地址和端口TcpServer server(&loop, listenAddr);//创建TcpServer并传入上面两server.setConnectionCallback(std::bind(onConnection, _1));//设置各种回调函数server.setMessageCallback(std::bind(onMessage, _1, _2, _3));server.start();//开启服务器loop.loop();//调用EventLoop::loop()开始事件驱动循环return 0;
}

TcpServer中的成员变量loop_就是这里创建的EventLoop,是主线程的事件驱动循环。
需要注意的是开启服务器监听用到的是EventLoop::loop()函数,TcpServer::start()是用来启动内部事件驱动线程池的

void TcpServer::start()
{if (started_.getAndSet(1) == 0){threadPool_->start(threadInitCallback_);/* 启动线程池 */assert(!acceptor_->listenning());loop_->runInLoop(std::bind(&Acceptor::listen, get_pointer(acceptor_)));/* * Acceptor和TcpServer在同一个线程,通常会直接调用 * std::bind只能值绑定,如果传入智能指针会增加引用计数,这里传递普通指针* 因为TcpServer没有销毁,所以不用担心Accepor会销毁*/}
}

开启事件驱动循环线程池,将Acceptor的Channel添加到EventLoop中,注册到Poller上,此时还没有调用EventLoop::loop(),所以还没有开启监听