本文转自CSDN博文《rust 实战 - 实现一个线程工作池 ThreadPool》,作者:firefantasy。
这是我读过的最简单、描述最清晰的 rust 线程池设计原理和示范代码,在此感谢作者辛勤付出,并向读者推荐该文。
如何实现一个线程池
线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
如何定义线程池Pool呢,首先最大线程数量肯定要作为线程池的一个属性,并且在new Pool时创建指定的线程。
1 线程池 Pool
pub struct Pool {
max_workers: usize, // 定义最大线程数
}impl Pool {
fn new(max_workers: usize) -> Pool {
}fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send {
}
}
用 execute 来执行任务,F: FnOnce() + "static + Send
是使用 thread::spawn
线程执行需要满足的 trait
, 代表F是一个能在线程里执行的闭包函数。
另一点自然而然会想到在 Pool 添加一个线程数组, 这个线程数组就是用来执行任务的。比如Vec<Thread> balabala
。这里的线程是活的,是一个个不断接受任务然后执行的实体。
可以看作在一个线程里不断执行获取任务并执行的 Worker。
struct Worker where
{
_id: usize, // worker 编号
}
要怎么把任务发送给 Worker 执行呢?mpsc(multi producer single consumer) 多生产者单消费者可以满足我们的需求,let (tx, rx) = mpsc::channel()
可以获取到一对发送端和接收端。
把发送端添加到 Pool 里面,把接收端添加到Worker里面。Pool通过channel将任务发送给多个worker消费执行。
这里有一点需要特别注意,channel 的接收端 receiver 需要安全的在多个线程间共享,因此需要用Arc<Mutex::<T>>
来包裹起来,也就是用锁来解决并发冲突。
2 Pool 的完整定义
pub struct Pool {
workers: Vec<Worker>,max_workers: usize,sender: mpsc::Sender<Message>
}
该是时候定义我们要发给Worker的消息Message了
定义如下的枚举值
type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,NewJob(Job),
}
Job 是一个要发送给 Worker 执行的闭包函数,这里 ByeBye 用来通知 Worker 可以终止当前的执行,退出线程。
只剩下实现 Worker 和 Pool 的具体逻辑了。
3 Worker的实现
impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let receiver = receiver.lock().unwrap();let message= receiver.recv().unwrap();match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);job();},Message::ByeBye => {
println!("ByeBye from worker[{}]", id);break},} }});Worker {
_id: id,t: Some(t),}}
}
let message = receiver.lock().unwrap().recv().unwrap();
这里获取锁后从receiver获取到消息体,然后 let message 结束后 rust 的生命周期会自动释放掉锁。
但如果写成
while let message = receiver.lock().unwrap().recv().unwrap() {
};
while let 后面整个括号都是一个作用域,要在这个作用域结束后,锁才会释放,比上面 let message 要锁定久时间。
rust 的 mutex 锁没有对应的unlock方法,由 mutex 的生命周期管理。
我们给 Pool 实现 Drop trait, 让 Pool 被销毁时,自动暂停掉 worker 线程的执行。
impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();}for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
t.join().unwrap();}}}
}
drop方法里面用了两个循环,而不是在一个循环里做完两件事?
for w in self.workers.iter_mut() {
if let Some(t) = w.t.take() {
self.sender.send(Message::ByeBye).unwrap();t.join().unwrap();}
}
这里面隐藏了一个会造成死锁的陷阱,比如两个 Worker, 在单个循环里面迭代所有 Worker,再将终止信息发送给通道后,直接调用 join,
我们预期是第一个 worker 要收到消息,并且等他执行完。当情况可能是第二个 worker 获取到了消息,第一个worker没有获取到,那接下来的 join 就会阻塞造成死锁。
注意到没有,Worker 是被包装在 Option 内的,这里有两个点需要注意
- t.join 需要持有 t 的所有权
- 在我们这种情况下,self.workers 只能作为引用被for循环迭代。
这里考虑让 Worker 持有 Option<JoinHandle<()>>
,后续可以通过在 Option
上调用 take
方法将 Some
变体的值移出来,并在原来的位置留下 None 变体。
换而言之,让运行中的 worker
持有 Some
的变体,清理 worker
时,可以使用 None
替换掉 Some
,从而让 Worker 失去可以运行的线程
struct Worker where
{
_id: usize,t: Option<JoinHandle<()>>,
}
4 要点总结
Mutex
依赖于生命周期管理锁的释放,使用的时候需要注意是否逾期持有锁Vec<Option<T>>
可以解决某些情况下需要T所有权的场景
5 完整代码
use std::thread::{
self, JoinHandle};
use std::sync::{
Arc, mpsc, Mutex};type Job = Box<dyn FnOnce() + "static + Send>;
enum Message {
ByeBye,NewJob(Job),
}struct Worker where
{
_id: usize,t: Option<JoinHandle<()>>,
}impl Worker
{
fn new(id: usize, receiver: Arc::<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let t = thread::spawn( move || {
loop {
let message = receiver.lock().unwrap().recv().unwrap();match message {
Message::NewJob(job) => {
println!("do job from worker[{}]", id);job();},Message::ByeBye => {
println!("ByeBye from worker[{}]", id);break},} }});Worker {
_id: id,t: Some(t),}}
}pub struct Pool {
workers: Vec<Worker>,max_workers: usize,sender: mpsc::Sender<Message>
}impl Pool where {
pub fn new(max_workers: usize) -> Pool {
if max_workers == 0 {
panic!("max_workers must be greater than zero!")}let (tx, rx) = mpsc::channel();let mut workers = Vec::with_capacity(max_workers);let receiver = Arc::new(Mutex::new(rx));for i in 0..max_workers {
workers.push(Worker::new(i, Arc::clone(&receiver)));}Pool {
workers: workers, max_workers: max_workers, sender: tx }}pub fn execute<F>(&self, f:F) where F: FnOnce() + "static + Send{
let job = Message::NewJob(Box::new(f));self.sender.send(job).unwrap();}
}impl Drop for Pool {
fn drop(&mut self) {
for _ in 0..self.max_workers {
self.sender.send(Message::ByeBye).unwrap();}for w in self.workers {
if let Some(t) = w.t.take() {
t.join().unwrap();}}}
}#[cfg(test)]
mod tests {
use super::*;#[test]fn it_works() {
let p = Pool::new(4);p.execute(|| println!("do new job1"));p.execute(|| println!("do new job2"));p.execute(|| println!("do new job3"));p.execute(|| println!("do new job4"));}
}
原文:rust 实战 - 实现一个线程工作池 ThreadPool
作者:firefantasy
来源:CSDN——中国程序员论坛