概念:当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。
好处:阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。
体系:Collection
→Queue
→BlockingQueue
→七个阻塞队列实现类。
类名 | 作用 |
---|---|
ArrayBlockingQueue | 由数组构成的有界阻塞队列 |
LinkedBlockingQueue | 由链表构成的有界阻塞队列 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 支持优先级的延迟无界阻塞队列 |
SynchronousQueue | 单个元素的阻塞队列 |
LinkedTransferQueue | 由链表构成的无界阻塞队列 |
LinkedBlockingDeque | 由链表构成的双向阻塞队列 |
粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的。
需要注意的是LinkedBlockingQueue
虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE
,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor
有体现)。
API:抛出异常是指当队列满时,再次插入会抛出异常;返回布尔是指当队列满时,再次插入会返回false;阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。超时是指当一个时限过后,才会插入或者取出。API使用见BlockingQueueDemo。
方法类型 | 抛出异常 | 返回布尔 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(E e) | offer(E e) | put(E e) | offer(E e,Time,TimeUnit) |
取出 | remove() | poll() | take() | poll(Time,TimeUnit) |
队首 | element() | peek() | 无 | 无 |
SynchronousQueue
队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个;
使用阻塞队列实现生产者消费者模式
在学习阻塞队列之前,实现线程通信有两种方式,一种是 Object类的 wait(),以及notifyAll()
另一种是通过 Lock 与condition.await() 与 condition.signal() 实现。
下面我们使用阻塞队列实现生产者消费者模式:
package com.cpown.demo.blockingqueue;import org.junit.platform.commons.util.StringUtils;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 使用阻塞队列实现,线程通信,交替生产消费,代替原来的 wait notifyAll* 以及 Lock Condition condition.await(); condition.signal();**/
public class BlockingQueueTest {
public static void main(String[] args) throws InterruptedException {
/*** 可以构造传入各种 BlockingQueue实现类 ArrayBlockingQueue,LinkedBlockingQueue等*/MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));new Thread(() ->{
System.out.println("生产线程启动");try {
myResource.prod();} catch (Exception e) {
e.printStackTrace();}},"Prod").start();new Thread(() ->{
System.out.println("消费线程启动");try {
myResource.consumer();} catch (Exception e) {
e.printStackTrace();}},"Consumer").start();TimeUnit.SECONDS.sleep(5);myResource.stop();}
}/*** 资源类*/
class MyResource {
/*** 默认开启通知生产 + 消费* 开关 标识 volatile保证可见性,随时叫停*/private volatile Boolean Flag = true;/*** 使用原子类保证并发性*/private AtomicInteger atomicInteger = new AtomicInteger();/*** 使用 BlockingQueue通用接口,可以灵活构造传入 各种类型的阻塞队列*/private BlockingQueue<String> blockingQueue = null;public MyResource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;}/*** 生产者* @throws Exception*/public void prod() throws Exception {
String data;boolean result;while (Flag){
//类似于 ++idata = atomicInteger.incrementAndGet()+"";//放入 阻塞队列中result = blockingQueue.offer(data,2, TimeUnit.SECONDS);if(result){
System.out.println(Thread.currentThread().getName() +"\t 插入队列成功 data ====>" + data);}else {
System.out.println(Thread.currentThread().getName() +"\t 插入队列失败 data ====>" + data);}TimeUnit.SECONDS.sleep(1);}System.out.println(Thread.currentThread().getName() +"\t 外界干预 生产结束");System.out.println();}/*** 消费* @throws Exception*/public void consumer() throws Exception {
String result;while (Flag){
/*** 取出队列中元素 ,超时则取出失败*/result = blockingQueue.poll(2, TimeUnit.SECONDS);if(StringUtils.isBlank(result)){
System.out.println(Thread.currentThread().getName() +"\t 超时未取到 消费退出");System.out.println();return;}System.out.println(Thread.currentThread().getName() +"\t 消费成功 result ====>" +result);}System.out.println(Thread.currentThread().getName() +"\t 外界干预 消费结束");}/*** Flag 等于 false之前,生产者消费者会一直协作执行* 调用stop 会立即通知停止*/public void stop(){
this.Flag = false;}
}
结果:
生产线程启动
消费线程启动
Prod 插入队列成功 data ====>1
Consumer 消费成功 result ====>1
Prod 插入队列成功 data ====>2
Consumer 消费成功 result ====>2
Prod 插入队列成功 data ====>3
Consumer 消费成功 result ====>3
Prod 插入队列成功 data ====>4
Consumer 消费成功 result ====>4
Prod 插入队列成功 data ====>5
Consumer 消费成功 result ====>5
Prod 外界干预 生产结束Consumer 超时未取到 消费退出