当前位置: 代码迷 >> 综合 >> 【muduo/base】线程
  详细解决方案

【muduo/base】线程

热度:40   发布时间:2023-12-12 09:37:43.0

#前言
muduo库也封装了线程Thread类,线程池ThreadPool。以及线程辅助类ThreadData,在这一部分我们可以学到如何创建线程,如何回收线程,加深对进程和线程的理解,以及代码规范方面的提升。

Thread类

头文件代码(去除了和命名相关的成员变量和函数)

class Thread : noncopyable
{
    public:typedef std::function<void ()> ThreadFunc;explicit Thread(ThreadFunc);~Thread();void start();int join();bool started() const {
     return started_; }pid_t tid() const {
     return tid_; }const string& name() const {
     return name_; }static int numCreated() {
     return numCreated_.get(); }private:bool       started_;bool       joined_;pthread_t  pthreadId_;pid_t      tid_;ThreadFunc func_;CountDownLatch latch_;};

这里我们简单归纳一下成员变量作用

线程状态相关: started_, joined_
线程信息相关 : pthreadId_,tid_
func_: 传入函数对象
代码执行相关 : latch_

到这里我们可能会疑惑:什么是tid?tid和threadid的区别是什么?

tid和threadid区别

在这里我们可以顺便复习一下操作系统相关知识
linux进程创建详细过程
进程和线程以及线程组
pid,tid,tgId区别

我们知道linux创建进程是通过(fork)分裂+替换(execv)生成新的进程。但是使用fork时,不知道你有没有考虑过下面这种情况:

进程A 创建了线程a,然后执行fork出了子进程B。那么这时候A创建的线程a,是否也会复制到进程B?
答案是不会?

这时我们是否会重新思考一下进程到底是什么?或许进程和线程不是一个包含关系?

这里做一下总结

  • 操作系统对进程的和线程调度算法并没有区别。进程和线程是地位相同的调度实体,在内核看来,他们都有一个统一的名字lwp(轻量级进程)
  • 线程组:是包含了多个lwp的集合,线程组id(tgid)为进程id,也就第一个lwp的pid
  • 进程组: 因为进程和线程对调度来看都一样,那么自然有进程组,进程组id(pgid)为父进程的pid
  • 我们使用ps打印的进程id,即线程组中第一个创建的lwp的pid,也即线程该lwp所在的线程组id
  • tid : tid就是lwp 的pid ,整个操作系统唯一
  • 线程id: 线程id是一个线程组内的唯一id 可以thread_self()获得。

typedef 和访问控制的关系

这里还有值得学习的地方,如typedef的位置
typedef定义为public ,用户可以直接使用Thread::ThreadFunc;定义为private,则仅可成员函数使用;定义为protected,则子类也可以使用

源文件代码

核心代码(删减非必要逻辑)

	struct ThreadData{
    typedef muduo::Thread::ThreadFunc ThreadFunc;ThreadFunc func_;string name_;pid_t *tid_;CountDownLatch *latch_;ThreadData(ThreadFunc func,const string &name,pid_t *tid,CountDownLatch *latch): func_(std::move(func)),name_(name),tid_(tid),latch_(latch){
    }void runInThread(){
    *tid_ = muduo::CurrentThread::tid();tid_ = NULL;latch_->countDown();latch_ = NULL;try{
    func_();}catch (...){
    throw; // rethrow}}};void *startThread(void *obj){
    ThreadData *data = static_cast<ThreadData *>(obj); //向下转型data->runInThread();delete data;return NULL;}Thread::Thread(ThreadFunc func): started_(false),joined_(false),func_(std::move(func)),latch_(1) {
    }Thread::~Thread(){
    if (started_ && !joined_){
    pthread_detach(pthreadId_);}}void Thread::start(){
    assert(!started_);started_ = true;// FIXME: move(func_)detail::ThreadData *data = new detail::ThreadData(func_, name_, &tid_, &latch_); // data这里是用来做传出参数的,相当于一个代理,更新thread的tid以及,取消start的堵塞if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))               //第三个参数为startThread()线程任务,传入参数void*{
    //失败started_ = false;delete data; // or no delete?LOG_SYSFATAL << "Failed in pthread_create";}else{
    // 成功latch_.wait();assert(tid_ > 0); //线程运行后,这里阻塞解除}}int Thread::join(){
    assert(started_);assert(!joined_);joined_ = true;return pthread_join(pthreadId_, NULL);}

ThreadData类

该类是一个辅助类,可以实现参数传入以及参数传出的功能,是一个代理类。创建线程的系统调用pthread_create,只能给线程绑定函数传入一个参数因此,需要一个额外类做参数包裹。

std::move

该函数功能可以将一个左值变为右值,如果某个类声明了右值构造函数(c++primer中有详细介绍),传参时不会经历对象的拷贝,而是将传入对象的控制权转移给新对象,并且将传入对象置为可析构状态,在适当情况下回收。使用右值引用需要慎重,如果传入对象被其他地方引用,可能会导致段错误。

代码规范

我们可以把只在当前源文件使用的类,定义在该源文件中

Thread类

一个执行的过程

创建thread对象threadA => threadA->start() => 创建threadData对象dataA => 系统调用创建线程,并绑定函数startThread => 创建线程成功 => 代码堵塞在 latch_.wait()处 => 线程startThread正式运行起来 => 调用传入参数dataA ->runInThread()方法 => 使threadA->latch_->countDown() => 主线程latch_.wait()检测到count<0,解除堵塞,返回执行调用threadA->start()之后的逻辑。 =》 主线程threadA->join() 堵塞在该处,直到线程任务结束线程资源回收完。

如果忘记调用了Join怎么办?

该类在Thread析构函数中调用了系统调用pthread_detach。该系统调用的作用是把该线程状态标记为detached状态,待线程执行完自动回收资源。对Thread对象销毁,但线程未被进行回收这种异常做了处理

  Thread::~Thread(){
    if (started_ && !joined_){
    pthread_detach(pthreadId_);}}

分析

析构中调用pthread_detach将pthread_create状态置为detached状态,线程结束后,会自动被系统回收

ThreadPool类

设计思路(简单的生产者消费者模型),共享资源:任务队列,使用run向任务队列中添加任务,线程从任务队列中取任务,执行。

class ThreadPool : noncopyable
{
    public:typedef std::function<void ()> Task;explicit ThreadPool();~ThreadPool();// Must be called before start().void setMaxQueueSize(int maxSize) {
     maxQueueSize_ = maxSize; }void setThreadInitCallback(const Task& cb){
     threadInitCallback_ = cb; }void start(int numThreads);void stop();size_t queueSize() const;void run(Task f);private:bool isFull() const REQUIRES(mutex_);void runInThread();Task take();mutable MutexLock mutex_;Condition notEmpty_ GUARDED_BY(mutex_);Condition notFull_ GUARDED_BY(mutex_);Task threadInitCallback_;std::vector<std::unique_ptr<muduo::Thread>> threads_;std::deque<Task> queue_ GUARDED_BY(mutex_);size_t maxQueueSize_;bool running_;
};

简单分析下类

  • setMaxQueueSize() : 设置任务队列的大小
  • setThreadInitCallback() : 设置初始化完毕回调,在线程正式运行起来后会调用该回调
  • start() : 启动线程池
  • run(Task f) : 添加任务
    条件变量 notEmpty_ ,notFull_ 最后讲解
源文件

start

void ThreadPool::start(int numThreads)
{
    assert(threads_.empty());running_ = true;threads_.reserve(numThreads);for (int i = 0; i < numThreads; ++i){
    threads_.emplace_back(new muduo::Thread(std::bind(&ThreadPool::runInThread, this), name_ + id)); //线程运行的时候回调runInThreadthreads_[i]->start();}//线程池没有额外设置线程if (numThreads == 0 && threadInitCallback_){
    threadInitCallback_();}
}
分析

根据numThreads创建Thread对象并运行。runInThread为消费者,从任务队列中取任务运行

代码规范

threads_是vector对象,vector每次扩张,会以当前元素*2扩张,使用reserve,直接>指定了长度,避免了可能的无效内存的分配,。在使用vector时如果,确定vector不会增长,更好的做法是如上述代码一样,直接预分配。

线程消费者任务

// 作为每个线程的任务
void ThreadPool::runInThread()
{
    try{
    if (threadInitCallback_){
    threadInitCallback_();//执行线程运行起来的回调函数} while (running_){
    Task task(take());if (task){
    task();}}}catch (...){
    }
}// 从任务队列获取一个任务
// 如果任务队列为空take将会堵塞,等待直到到任务队列不为空
ThreadPool::Task ThreadPool::take()
{
    MutexLockGuard lock(mutex_);// always use a while-loop, due to spurious wakeupwhile (queue_.empty() && running_){
    notEmpty_.wait();//如果任务队列空,挂起该线程}Task task;if (!queue_.empty()){
    task = queue_.front();queue_.pop_front();if (maxQueueSize_ > 0){
    notFull_.notify();}}return task;
}

生产者

//添加任务的,如果queue_满了,则堵塞等到queue被消费后继续添加
void ThreadPool::run(Task task)
{
    if (threads_.empty()){
    task();}else{
    MutexLockGuard lock(mutex_);while (isFull() && running_){
    notFull_.wait();}if (!running_)return;assert(!isFull());queue_.push_back(std::move(task));notEmpty_.notify();}
}
  while (queue_.empty() && running_){
    notEmpty_.wait();//如果任务队列空,挂起该线程}

分析下上面这块代码,while的目的是将该函数变为阻塞式函数,notEmpty_.wait(),挂起该线程,是为了避免对cpu的消耗。当线程([生产者线程)调用notEmpty_.notify(),唤醒调用notEmpty_.wait()堵塞的线程(消费者线程),消费者线程执行while,这时候任务队列不为空了,解除堵塞,执行下面逻辑。

  相关解决方案