当前位置: 代码迷 >> 综合 >> thrift 源码分析 ThreadManager
  详细解决方案

thrift 源码分析 ThreadManager

热度:11   发布时间:2024-01-14 08:16:57.0
ThreadManager

??这个主要为thrift 中的work 线程的管理类

构造函数

??ThreadManager 是个抽象类,所以我们分析它的子类,我们主要分析下newSimpleThreadManager

newSimpleThreadManager

??这里workercount 就是我们初始化线程管理类的大小

class SimpleThreadManager : public ThreadManager::Impl {...
}static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count = 4,
size_t pendingTaskCountMax = 0)
{return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
}SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0): workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) 
{}
start函数

?? 如下ThreadManager的start,stop等函数都是纯虚函数需要具体的子类重写后调用。我们分析SimpleThreadManager的重写函数即可。

class ThreadManager  {virtual void start() = 0; 
}
SimpleThreadManager::start
  void start() {ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_); // 0 代表任务队列没有上限ThreadManager::Impl::start();addWorker(workerCount_);}------------------------------------------------------------------------------------------void ThreadManager::Impl::start() {if (state_ == ThreadManager::STOPPED) {return;}{Synchronized s(monitor_);if (state_ == ThreadManager::UNINITIALIZED) {if (!threadFactory_) {throw InvalidArgumentException();}state_ = ThreadManager::STARTED;monitor_.notifyAll();}while (state_ == STARTING) {monitor_.wait(); // wait(0) == waitForever 里面实际调用了 pthread_wait_cond 等待主线程唤醒}}
}
----------------------------------------------------------------------------------------------
void ThreadManager::Impl::addWorker(size_t value) {std::set<shared_ptr<Thread> > newThreads;for (size_t ix = 0; ix < value; ix++) {shared_ptr<ThreadManager::Worker> worker= shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));newThreads.insert(threadFactory_->newThread(worker)); // 工厂函数生成响应的线程类}{Synchronized s(monitor_);workerMaxCount_ += value;workers_.insert(newThreads.begin(), newThreads.end());}for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();++ix) {shared_ptr<ThreadManager::Worker> worker= dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());worker->state_ = ThreadManager::Worker::STARTING;(*ix)->start(); // 返回的具体线程类执行start函数,里面调用了pthread_create 生成线程idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));}{Synchronized s(workerMonitor_);while (workerCount_ != workerMaxCount_) {workerMonitor_.wait();}}
}

??上面start函数就会进行真正的生成线程,现在以PosixThreadFactory为列子,看看它的start怎么实现的

  void start() {if (state_ != uninitialized) {return;}pthread_attr_t thread_attr;if (pthread_attr_init(&thread_attr) != 0) {throw SystemResourceException("pthread_attr_init failed");}if (pthread_attr_setdetachstate(&thread_attr,detached_ ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE)!= 0) {throw SystemResourceException("pthread_attr_setdetachstate failed");}// Set thread stack sizeif (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {throw SystemResourceException("pthread_attr_setstacksize failed");}// Set thread policy
#ifdef _WIN32// WIN32 Pthread implementation doesn't seem to support sheduling policies other then// PosixThreadFactory::OTHER - runtime errorpolicy_ = PosixThreadFactory::OTHER;
#endifif (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {throw SystemResourceException("pthread_attr_setschedpolicy failed");}struct sched_param sched_param;sched_param.sched_priority = priority_;// Set thread priorityif (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {throw SystemResourceException("pthread_attr_setschedparam failed");}// Create referenceshared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();*selfRef = self_.lock();state_ = starting;if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {throw SystemResourceException("pthread_create failed");}}
void* PthreadThread::threadMain(void* arg) {shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);if (thread == NULL) {return (void*)0;}if (thread->state_ != starting) {return (void*)0;}#if GOOGLE_PERFTOOLS_REGISTER_THREADProfilerRegisterThread();
#endifthread->state_ = started;thread->runnable()->run(); //这个runnable 就是 ThreadManager 中的worker请看 addWorker 函数中的操作//这里的runnable返回的就是workerif (thread->state_ != stopping && thread->state_ != stopped) {thread->state_ = stopping;}return (void*)0;
}

接下来看下worker::run,主要就是取task ,然后执行 task->run。

void run() {bool active = false;bool notifyManager = false;/*** Increment worker semaphore and notify manager if worker count reached* desired max** Note: We have to release the monitor and acquire the workerMonitor* since that is what the manager blocks on for worker add/remove*/{Synchronized s(manager_->monitor_);active = manager_->workerCount_ < manager_->workerMaxCount_;if (active) {manager_->workerCount_++;notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;}}if (notifyManager) {Synchronized s(manager_->workerMonitor_);manager_->workerMonitor_.notify();notifyManager = false;}while (active) {shared_ptr<ThreadManager::Task> task;/*** While holding manager monitor block for non-empty task queue (Also* check that the thread hasn't been requested to stop). Once the queue* is non-empty, dequeue a task, release monitor, and execute. If the* worker max count has been decremented such that we exceed it, mark* ourself inactive, decrement the worker count and notify the manager* (technically we're notifying the next blocked thread but eventually* the manager will see it.*/{// 全局锁 lock 获取 元素// Server I/O 线程 add 的时候也会使用这个锁// 也就是说 Work 和 I/O 都会争用一把锁Guard g(manager_->mutex_);active = isActive();while (active && manager_->tasks_.empty()) {manager_->idleCount_++;idle_ = true;manager_->monitor_.wait();active = isActive();idle_ = false;manager_->idleCount_--;}if (active) {manager_->removeExpiredTasks();if (!manager_->tasks_.empty()) {task = manager_->tasks_.front();manager_->tasks_.pop();if (task->state_ == ThreadManager::Task::WAITING) {task->state_ = ThreadManager::Task::EXECUTING;}}/* If we have a pending task max and we just dropped below it, wakeup anythread that might be blocked on add. */if (manager_->pendingTaskCountMax_ != 0&& manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {manager_->maxMonitor_.notify();}} else {idle_ = true;manager_->workerCount_--;notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);}}if (task) {if (task->state_ == ThreadManager::Task::EXECUTING) {try {task->run();} catch (const std::exception& e) {GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());} catch (...) {GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");}}}}{Synchronized s(manager_->workerMonitor_);manager_->deadWorkers_.insert(this->thread());if (notifyManager) {manager_->workerMonitor_.notify();}}return;}
task::run

每个server的task实现不同,我们看看TNonblockingServer的task,主要执行process函数,也就是执行我们设置的函数

class TNonblockingServer::TConnection::Task : public Runnable {void run();};TNonblockingServer::TConnection::Task::run()
{try {for (;;) {if (serverEventHandler_) {serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());}// 执行成功break;if (!processor_->process(input_, output_, connectionContext_)|| !input_->getTransport()->peek()) {break;}}} catch (const TTransportException& ttx) {GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());} catch (const bad_alloc&) {GlobalOutput("TNonblockingServer: caught bad_alloc exception.");exit(1);} catch (const std::exception& x) {GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",typeid(x).name(),x.what());} catch (...) {GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");}// Signal completion back to the libevent thread via a pipeif (!connection_->notifyIOThread()) {GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");connection_->close();throw TException("TNonblockingServer::Task::run: failed write on notify pipe");}}