- 简介
生产者-消费者模式是经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,有两类线程:若干个生产者线程和若干个消费者线程。生产者负责提交用户请求,消费者用于具体的处理生产者提交的任务。生产者和消费者通过共享内存缓冲区进行数据通信。
生产者-消费者模式的基本结构如下图:
通过上图可以看出,生产者和消费者通过共享内存缓冲区进行通信,他们之间并不进行直接的通信,从而减少了他们之间的耦合,生产者不需要直到消费者的存在,消费者也不需要知道生产者的存在。内存缓冲区主要的功能是实现数据在多线程间的共享,此外,通过该缓冲区,还可以缓解生产者和消费者之间的性能差异。
- 生产者-消费者模式实现
下面以生产者-消费者模式的简单实现,介绍该模式的优点:
生产者代码:
1 public class Producer implements Runnable { 2 3 private volatile boolean isRunnig = true; 4 5 private BlockingQueue<PCData> queue;//缓冲队列 6 7 private static AtomicInteger count = new AtomicInteger(); 8 9 private static final int SLEEPTIME = 1000;10 11 12 public Producer(BlockingQueue<PCData> queue) {13 this.queue = queue;14 }15 16 17 @Override18 public void run() {19 PCData pcData = null;20 Random r = new Random();21 22 System.out.println("start producer id:"+Thread.currentThread().getId());23 24 try {25 while(true){26 Thread.sleep(r.nextInt(SLEEPTIME));27 pcData = new PCData(count.incrementAndGet());28 System.out.println(pcData +"is put into queue");29 if(!queue.offer(pcData, 2, TimeUnit.SECONDS)){30 System.err.println("fail to put data:"+pcData);31 }32 }33 } catch (Exception e) {34 // TODO: handle exception35 e.printStackTrace();36 Thread.currentThread().interrupt();37 }38 39 }40 41 public void stop(){42 isRunnig = false;43 }44 45 46 }
消费者代码:
1 public class Consumer implements Runnable { 2 3 private BlockingQueue<PCData> queue;//缓冲队列 4 5 private static final int SLEEPTIME=1000; 6 7 8 9 public Consumer(BlockingQueue<PCData> queue) {10 this.queue = queue;11 }12 13 14 15 @Override16 public void run() {17 System.out.println("start constomer id:"+Thread.currentThread().getId());18 Random r = new Random();19 try {20 while(true){21 PCData data = queue.take();22 int re = data.getIntData()*data.getIntData();23 System.out.println(MessageFormat.format("{0}*{1}={2}", data.getIntData(),data.getIntData(),re));24 25 Thread.sleep(r.nextInt(SLEEPTIME));26 }27 } catch (Exception e) {28 // TODO: handle exception29 e.printStackTrace();30 Thread.currentThread().interrupt();31 }32 33 }34 35 }
消费者、生产者之间的共享数据模型:
1 public final class PCData { 2 private final int intData; 3 4 public PCData(int intData) { 5 this.intData = intData; 6 } 7 8 public PCData(String strData){ 9 this.intData = Integer.valueOf(strData);10 }11 12 public synchronized int getIntData() {13 return intData;14 }15 16 @Override17 public String toString() {18 return "data:" + intData;19 }20 21 22 23 }
在客户端中,启动3个消费者和3个生产者,并让他们协作运行:
1 public class Client { 2 public static void main(String[] args) throws InterruptedException { 3 BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10); 4 5 Producer p1 = new Producer(queue); 6 Producer p2 = new Producer(queue); 7 Producer p3 = new Producer(queue); 8 9 Consumer c1 = new Consumer(queue);10 Consumer c2 = new Consumer(queue);11 Consumer c3 = new Consumer(queue);12 13 ExecutorService exe = Executors.newCachedThreadPool();14 exe.execute(p1);15 exe.execute(p2);16 exe.execute(p3);17 18 exe.execute(c1);19 exe.execute(c2);20 exe.execute(c3);21 22 Thread.sleep(10*1000);23 24 p1.stop();25 p2.stop();26 p3.stop();27 28 Thread.sleep(3000);29 exe.shutdown();30 }31 }
优点:生产者-消费者模式能很好的对生产者线程和消费者线程进行解耦,优化系统的整体结构。同时,由于缓冲区的存在,运行生产者和消费者在性能上存在一定的差异,从而一定程度上缓解了性能瓶颈对系统性能的影响。