当前位置: 代码迷 >> 综合 >> 阻塞队列 BlockingQueue 实现生产者消费者模式线程通信
  详细解决方案

阻塞队列 BlockingQueue 实现生产者消费者模式线程通信

热度:53   发布时间:2024-02-28 04:50:27.0

概念:当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。
好处:阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。

体系CollectionQueueBlockingQueue→七个阻塞队列实现类。

类名 作用
ArrayBlockingQueue 数组构成的有界阻塞队列
LinkedBlockingQueue 链表构成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 支持优先级的延迟无界阻塞队列
SynchronousQueue 单个元素的阻塞队列
LinkedTransferQueue 由链表构成的无界阻塞队列
LinkedBlockingDeque 由链表构成的双向阻塞队列

粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的。

需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)。

API:抛出异常是指当队列满时,再次插入会抛出异常;返回布尔是指当队列满时,再次插入会返回false;阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。超时是指当一个时限过后,才会插入或者取出。API使用见BlockingQueueDemo。

方法类型 抛出异常 返回布尔 阻塞 超时
插入 add(E e) offer(E e) put(E e) offer(E e,Time,TimeUnit)
取出 remove() poll() take() poll(Time,TimeUnit)
队首 element() peek()

SynchronousQueue

队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个;

使用阻塞队列实现生产者消费者模式

在学习阻塞队列之前,实现线程通信有两种方式,一种是 Object类的 wait(),以及notifyAll()

另一种是通过 Lock 与condition.await() 与 condition.signal() 实现。

下面我们使用阻塞队列实现生产者消费者模式:

package com.cpown.demo.blockingqueue;import org.junit.platform.commons.util.StringUtils;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 使用阻塞队列实现,线程通信,交替生产消费,代替原来的 wait notifyAll* 以及 Lock Condition condition.await(); condition.signal();**/
public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
    /*** 可以构造传入各种 BlockingQueue实现类 ArrayBlockingQueue,LinkedBlockingQueue等*/MyResource myResource = new  MyResource(new ArrayBlockingQueue<>(10));new Thread(() ->{
    System.out.println("生产线程启动");try {
    myResource.prod();} catch (Exception e) {
    e.printStackTrace();}},"Prod").start();new Thread(() ->{
    System.out.println("消费线程启动");try {
    myResource.consumer();} catch (Exception e) {
    e.printStackTrace();}},"Consumer").start();TimeUnit.SECONDS.sleep(5);myResource.stop();}
}/*** 资源类*/
class MyResource {
    /*** 默认开启通知生产 + 消费* 开关 标识 volatile保证可见性,随时叫停*/private volatile Boolean Flag = true;/*** 使用原子类保证并发性*/private AtomicInteger atomicInteger = new AtomicInteger();/*** 使用 BlockingQueue通用接口,可以灵活构造传入 各种类型的阻塞队列*/private BlockingQueue<String> blockingQueue = null;public MyResource(BlockingQueue<String> blockingQueue) {
    this.blockingQueue = blockingQueue;}/*** 生产者* @throws Exception*/public void prod() throws Exception {
    String data;boolean result;while (Flag){
    //类似于 ++idata = atomicInteger.incrementAndGet()+"";//放入 阻塞队列中result = blockingQueue.offer(data,2, TimeUnit.SECONDS);if(result){
    System.out.println(Thread.currentThread().getName() +"\t 插入队列成功 data ====>" + data);}else {
    System.out.println(Thread.currentThread().getName() +"\t 插入队列失败 data ====>"  + data);}TimeUnit.SECONDS.sleep(1);}System.out.println(Thread.currentThread().getName() +"\t 外界干预 生产结束");System.out.println();}/*** 消费* @throws Exception*/public void consumer() throws Exception {
    String result;while (Flag){
    /*** 取出队列中元素 ,超时则取出失败*/result = blockingQueue.poll(2, TimeUnit.SECONDS);if(StringUtils.isBlank(result)){
    System.out.println(Thread.currentThread().getName() +"\t 超时未取到 消费退出");System.out.println();return;}System.out.println(Thread.currentThread().getName() +"\t 消费成功 result ====>" +result);}System.out.println(Thread.currentThread().getName() +"\t 外界干预 消费结束");}/*** Flag 等于 false之前,生产者消费者会一直协作执行* 调用stop 会立即通知停止*/public void stop(){
    this.Flag = false;}
}

结果:

生产线程启动
消费线程启动
Prod	 插入队列成功 data ====>1
Consumer	 消费成功 result ====>1
Prod	 插入队列成功 data ====>2
Consumer	 消费成功 result ====>2
Prod	 插入队列成功 data ====>3
Consumer	 消费成功 result ====>3
Prod	 插入队列成功 data ====>4
Consumer	 消费成功 result ====>4
Prod	 插入队列成功 data ====>5
Consumer	 消费成功 result ====>5
Prod	 外界干预  生产结束Consumer	 超时未取到 消费退出