1. 生产者消费者扩展
1.1 多个生产者、多个消费者
由一个生产者、一个消费者、一个商品 ==》 扩展为多个生产者、多个消费者、多个商品
- 最多有 10 个商品,最少有 0 个商品
- 已经有 10 个商品后,生产者就不再生产,还要通知消费者进行消费
- 没有商品时,消费者不再消费,还要通知生产者进行生产
生产者线程(任务)ProduceRunnable
public class ProduceRunnable implements Runnable{
private ProductFactory factory;public ProduceRunnable() {
}// 使用构造方法的方式赋值,保证为同一个商品public ProduceRunnable(ProductFactory factory) {
this.factory = factory;}@Overridepublic void run() {
int i = 0;while (true) {
try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();}// 生产一个商品factory.produce("生产商品" + i);i++;}}
}
消费者线程(任务)ConsumeRunnable
public class ConsumeRunnable implements Runnable{
private ProductFactory factory;public ProductFactory getProductFactory() {
return factory;}// 使用 set 方法的形式赋值public void setProductFactory(ProductFactory productFactory) {
this.factory = productFactory;}@Overridepublic void run() {
int i = 0;while (true) {
try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();}// 消费一个商品factory.consume("消费一个商品" + i);i++;}}
}
ProductFactory商品工厂类
public class ProductFactory {
/** 存储商品 */List<String> list = new LinkedList<>();int max = 10;/** 生产商品 */public synchronized void produce(String name) {
// 仓库已满,停止生产while (list.size() == max) {
// 使用 while 循环,当一直为满的状态时,一直等待try {
this.wait();} catch (InterruptedException e) {
e.printStackTrace();}}list.add(name);System.out.println(Thread.currentThread().getName() + "【生产】了商品,当前商品总数:" + list.size());// 生产满后,通知消费者if (list.size() == max) {
this.notifyAll(); // 唤醒所有,保证肯定可以唤醒对方的线程}}/** 消费商品 */public synchronized void consume(String name) {
// 仓库为空,就等待while (list.size() == 0) {
try {
this.wait();} catch (InterruptedException e) {
e.printStackTrace();}}list.remove(0);System.out.println(Thread.currentThread().getName() + "【消费】了商品,当前商品总数:" + list.size());// 仓库为空,通知生产者if (list.size() == 0) {
this.notifyAll();}}
}
测试类:
public static void main(String[] args) {
ProductFactory factory = new ProductFactory();// 创建 10 个生产者线程,并启动Runnable runnable1 = new ProduceRunnable(factory);for (int i = 0; i < 10; i++) {
new Thread(runnable1, "生产者" + i).start();}// 创建 20 个消费者线程,并启动ConsumeRunnable runnable2 = new ConsumeRunnable();runnable2.setProductFactory(factory);for (int i = 0; i < 20; i++) {
new Thread(runnable2, "消费者" + i).start();}
}
1.2 使用匿名内部类
可以将 ConsumeRunnable 和 ProduceRunnable 两个线程要做的任务类,在测试类中,直接写成匿名内部类的方式进行实现
匿名内部类中使用的局部变量,必须为 final 的, JDK8中可以省略 final
public static void main(String[] args) {
ProductFactory factory = new ProductFactory();// 创建 10 个生产者线程,并启动 【匿名内部类】Runnable runnable1 = new Runnable() {
@Overridepublic void run() {
int i = 0;while (true) {
try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();}// 生产一个商品factory.produce("生产商品" + i); // factory直接为本类的, 修饰为 final 的i++;}}};for (int i = 0; i < 10; i++) {
new Thread(runnable1, "生产者" + i).start();}// 创建 20 个消费者线程,并启动 【匿名内部类】Runnable runnable2 = new Runnable() {
@Overridepublic void run() {
int i = 0;while (true) {
try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();}// 消费一个商品factory.consume("消费一个商品" + i); // factory直接为本类的i++;}}};for (int i = 0; i < 20; i++) {
new Thread(runnable2, "消费者" + i).start();}
}
1.3 使用 Lock 锁
进一步优化:每次唤醒线程都是唤醒所有生产者和消费者( this.notifyAll() ) ==》 使用Lock锁 + Condition解决
public class ProductFactory {
/** 存储商品 */List<String> list = new LinkedList<>();int max = 10;// !!! 使用 Lock 锁 + ConditionLock lock = new ReentrantLock();Condition produceCondition = lock.newCondition();Condition consumeCondition = lock.newCondition();/** 生产商品 */public void produce(String name) {
lock.lock();try {
// 仓库已满,停止生产while (list.size() == max) {
try {
produceCondition.await(); // 生产者进入自己的等待队列} catch (InterruptedException e) {
e.printStackTrace();}}list.add(name);System.out.println(Thread.currentThread().getName() + "【生产】了商品,当前商品总数:" + list.size());// 生产一个后,就唤醒消费者consumeCondition.signal(); // 唤醒消费者等待队列中的随机一个} finally {
lock.unlock();}}/** 消费商品 */public void consume(String name) {
lock.lock();try {
// 仓库为空,就等待while (list.size() == 0) {
try {
consumeCondition.await();} catch (InterruptedException e) {
e.printStackTrace();}}list.remove(0);System.out.println(Thread.currentThread().getName() + "【消费】了商品,当前商品总数:" + list.size());// 消费一个后,就通知生产者进行生产produceCondition.signal();} finally {
lock.unlock();}}
}
- 使用 synchronized 不需要用户去手动释放锁,它是Java语言的关键字,当出现异常时,JVM会自动释放被占用的锁
- Lock 是一个类,必须用户手动去释放锁,不然可能会出现死锁的现象
- Condition 是 JDK1.5中出现的,使用
await(), signal()
实现线程间协作更加安全和高效- 可以实现一个同步队列和多个等待队列,从而能够更精准的控制多线程的休眠与唤醒
- 必须在 lock.lock() 和 lock.unlock() 之间才可以使用
2. 认识 Lock 锁API
2.1 Lock
- 可重入锁:Lock 、ReadWriteLock 、synchronized 都是可重入锁(自己可进入自己的锁)
- 独占锁和共享锁:WriteLock、ReentrantLock、synchronized 独占锁 ReadLock 共享锁
- 公平锁和非公平锁
Lock lock = new ReentrantLock();
// 获取锁1: 拿不到锁就一直等待,拿到就执行后续代码
lock.lock();
// 获取锁2: 拿不到就返回false,拿到就返回true
lock.tryLock();
// 获取锁3: 如果拿不到锁,就尝试指定的时间,时间到了还是没拿到,才放弃
lock.tryLock(10, TimeUnit.MICROSECONDS);
// 获取锁4: 拿不到锁就一直等待,中途可以被其他线程中断
lock.lockInterruptibly();// 解锁
lock.unlock();// 创建一个等待队列
lock.newCondition();
2.2 ReadWriteLock
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
// 写锁
Lock readLock = rwLock.readLock();
Lock readLock1 = rwLock.readLock();
System.out.println(readLock1 == readLock); // !!! 返回的是【同一把】锁
// 读锁
Lock writeLock = rwLock.writeLock();
实例:多个读操作可以同时进行,读写操作、写写操作都是互斥的
public class TestReadWriteLock {
public static void main(String[] args) {
final Operator operator = new Operator();// 创建 5 个读线程并启动Runnable readRunnable = new Runnable() {
@Overridepublic void run() {
operator.read();}};for (int i = 0; i < 10; i++) {
new Thread(readRunnable, "读线程" + i).start(); // 5个线程同时进行一个读操作}// 创建 5 个写线程并启动Runnable writeRunnable = new Runnable() {
@Overridepublic void run() {
operator.write();}};for (int i = 0; i < 10; i++) {
new Thread(writeRunnable, "写线程" + i).start();}}
}class Operator {
// private Lock lock = new ReentrantLock(); !!! 使用 Lock 会导致读线程进入后,必须执行完才能执行其他的private ReadWriteLock rwLock = new ReentrantReadWriteLock(); // !!! 使用 ReadWriteLock 可以达到多个读同时进行,写操作互斥public void read() {
// lock.lock();rwLock.readLock().lock();try {
System.out.println(Thread.currentThread().getName() + "读数据【开始】!");try {
Thread.sleep(10);} catch (InterruptedException e) {
e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "读数据【结束】!");} finally {
// lock.unlock();rwLock.readLock().unlock();}}public void write() {
// lock.lock();rwLock.writeLock().lock();try {
System.out.println(Thread.currentThread().getName() + "写数据【开始】!");try {
Thread.sleep(10);} catch (InterruptedException e) {
e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "写数据【结束】!");} finally {
// lock.unlock();rwLock.writeLock().unlock();}}
}
3. BlockingQueue
BlockingQueue即阻塞队列,位于 java.util.concurrent (JDK5)包中,被阻塞的情况主要有如下两种:
- 当队列满了的时候,进行入队操作
- 当队列空了的时候,进行出队操作
BlockingQueue 的方法:
可能抛出异常的操作 | 特殊值 | 可能会产生阻塞的操作 | 超时 | |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() |
使用 BlockingQueue 实现生产者消费者问题:
public class ProductFactory {
/** 存储商品 */BlockingQueue list = new ArrayBlockingQueue(10); // 使用 BlockQueue 实现/** 生产商品 */public void produce(String name) {
try {
list.put(name); // 放入值,满了之后再放会阻塞} catch (InterruptedException e) {
e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "【生产】了商品,当前商品总数:" + list.size());}/** 消费商品 */public void consume(String name) {
try {
list.take(); // 取出值,空了之后再取会阻塞} catch (InterruptedException e) {
e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "【消费】了商品,当前商品总数:" + list.size());}
}
注意:
- BlockingQueue 不接受 null 元素
- 可以是限定容量的
- 实现是线程安全的
常见的 BlockingQueue:
- ArrayBlockingQueue:有边界的(容量有限的)阻塞队列,底层实现是数组,容量大小一旦指定就不可改变,先进先出
- LinkedBlockingQueue:初始化时,如果指定容量就是有边界的,不指定则是无边界的,底层实现是链表,先进先出
4. volatile
- Java内存模型
- 可见性:一个线程修改的状态对另一个线程是可见的 volatile、synchronized、final 实现可见性
- 原子性:不可再分割,一旦执行就要执行完 synchronized、lock、unlock
- 有序性:禁止指令重排序 synchronized、volatile
当对非 volatile 变量进行读写的时候,每个线程先从内存拷贝变量到CPU缓存中。如果计算机有多个CPU,每个线程可能在不同的CPU上被处理,意味着每个线程可以拷贝到不同的 CPU cache中。 而声明变量是 volatile 的,JVM 保证了每次读变量都从内存中读,跳过了 CPU Cache 这一步
public class TestVolatile {
static volatile boolean flag = true; // 对所有线程可见public static void main(String[] args) {
// 主线程Runnable runnable = new Runnable() {
@Overridepublic void run() {
while (flag) {
System.out.println("=====================");}}};new Thread(runnable).start(); // Thread新线程try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();}flag = false;}
}
5. 多线程练习题
5.1 打印数字及字母
编写两个线程,一个线程打印 1-52 的整数,另一个线程打印字母 A-Z。打印顺序为 12A34B56C…5152Z
Printer类:
public class Printer {
private int index = 1; // 判断是第几次打印public int num = 1;public char c = 'A';Lock lock = new ReentrantLock();Condition numCondition = lock.newCondition();Condition charCondition = lock.newCondition();public void printNum() {
lock.lock();try {
while (index % 2 == 0) {
try {
numCondition.await();} catch (InterruptedException e) {
e.printStackTrace();}}System.out.print(num + "" + (num + 1));num += 2;index++;if (index % 2 == 0) {
charCondition.signal();}} finally {
lock.unlock();}}public void printChar() {
lock.lock();try {
while (index % 2 != 0) {
try {
charCondition.await();} catch (InterruptedException e) {
e.printStackTrace();}}System.out.print(c);c+=1;index++;if (index % 2 != 0) {
numCondition.signal();}} finally {
lock.unlock();}}
}
测试类:
public static void main(String[] args) {
Printer p = new Printer();new Thread(new Runnable() {
@Overridepublic void run() {
while (p.num <= 52) {
p.printNum();}}}).start();new Thread(new Runnable() {
@Overridepublic void run() {
while (p.c <= 'Z') {
p.printChar();}}}).start();
}
5.2 打印递增的数字
启动 3 个线程,打印递增的数字,线程 1 先打印 1, 2, 3, 4, 5 然后线程 2 打印 6, 7, 8, 9, 10,线程 3 打印 11, 12, 13, 14, 15,然后线程 1 再打印 16, 17, 18, 19, 20 …以此类推,直到打印到 60。
public class ThreadExercise {
public static void main(String[] args) {
final Print print = new Print();for (int i = 0; i < 3; i++) {
// 创建 3 个线程new Thread(new Runnable() {
@Overridepublic void run() {
for (int j = 0; j < 4; j++) {
// 每个线程执行 4 次打印方法print.print();}}}, i + "").start(); // 线程名字,正好是 0, 1, 2}}
}class Print {
// 要打印的数字private int num = 1;// 判断当前该哪个线程打印private int status = 0; // 3个线程,根据名称判断, 0~2, 所以循环的时候 % 3即可public synchronized void print() {
// 当前轮不到自己打印时,一直等待while ((status % 3) != Integer.parseInt(Thread.currentThread().getName())) {
try {
this.wait();} catch (InterruptedException e) {
e.printStackTrace();}}// 打印数字for (int i = 0; i < 5; i++) {
if (i > 0) {
System.out.print(",");}System.out.print(num++);}System.out.println();// 自己的任务执行完了,改变status的值status++;// 打印完成后,唤醒其他的线程this.notifyAll();}
}
5.3 模拟售票
启动 3 个线程打印递减数字,范围是 30 ~ 1。要求数字不能重复,每个线程打印一个数字后,立刻进入睡眠状态,时间为300毫秒
public class TicketSale {
public static void main(String[] args) {
Ticket ticket = new Ticket();new Thread(new Runnable() {
@Overridepublic void run() {
ticket.sale1();}}).start();new Thread(new Runnable() {
@Overridepublic void run() {
ticket.sale2();}}).start();new Thread(new Runnable() {
@Overridepublic void run() {
ticket.sale3();}}).start();}
}class Ticket {
private int num = 30;Lock lock = new ReentrantLock();Condition thread1 = lock.newCondition();Condition thread2 = lock.newCondition();Condition thread3 = lock.newCondition();public void sale1() {
while (num > 0) {
lock.lock();try {
thread2.signal();thread3.signal();System.out.println(Thread.currentThread().getName() + "售出:" + num--);try {
if (num > 0) {
thread1.await(300, TimeUnit.MICROSECONDS);}} catch (InterruptedException e) {
e.printStackTrace();}} finally {
lock.unlock();}}}public void sale2() {
while (num > 0) {
lock.lock();try {
thread1.signal();thread3.signal();System.out.println(Thread.currentThread().getName() + "售出:" + num--);try {
if (num > 0) {
thread2.await(300, TimeUnit.MICROSECONDS);}} catch (InterruptedException e) {
e.printStackTrace();}} finally {
lock.unlock();}}}public void sale3() {
while (num > 0) {
lock.lock();try {
thread2.signal();thread1.signal();System.out.println(Thread.currentThread().getName() + "售出:" + num--);try {
if (num > 0) {
thread3.await(300, TimeUnit.MICROSECONDS);}} catch (InterruptedException e) {
e.printStackTrace();}} finally {
lock.unlock();}}}
}
5.4 自定义容器
自定义容器,提供新增元素(add)和查看元素(get)方法。add方法向容器末尾位置新增元素,get方法通过参数传递的下标返回对应位置的元素。注意:get方法只读,不会删除对应位置的元素数据。
要求为容器提供读写锁能力。即写写互斥、读写互斥,多线程并发访问当前容器,只能有一个线程做写操作,可以有多个线程同时执行读操作。
public class MyContainer {
public static void main(String[] args) throws InterruptedException {
final Operate o = new Operate();for (int i = 0; i < 2; i++) {
new Thread(new Runnable() {
@Overridepublic void run() {
for (int j = 0; j < 5; j++) {
o.add("数据" + j);}}}, "写操作").start();}Thread.sleep(10);for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Overridepublic void run() {
for (int j = 0; j < 5; j++) {
System.out.println(Thread.currentThread().getName() + " get - " + o.get(new Random().nextInt(3)));}}}, "读操作").start();}}
}class Operate {
// 数据载体List<String> list = new ArrayList<>();final Object writeLock = new Object();// 是否有线程在执行写操作boolean isWrite = false;boolean isRead = false;public void add(String str) {
// 判断有没有其他线程读while (isRead) {
try {
Thread.sleep(10);} catch (InterruptedException e) {
e.printStackTrace();}}// 锁定写锁,synchronized 本身即为原子操作synchronized (writeLock) {
try {
// 判读有没有其他线程写while (isWrite) {
// 如果有在写的,就等待try {
writeLock.wait();} catch (InterruptedException e) {
e.printStackTrace();}}isWrite = true;list.add(str);System.out.println(Thread.currentThread().getName() + " add - " + str);// 唤醒其他写操作的线程writeLock.notifyAll();} finally {
// 可以省略isWrite = false;}}}public String get(int index) {
if (index >= list.size() || index < 0) {
throw new IndexOutOfBoundsException();}while (isWrite) {
// 有线程在执行写操作时,当前线程阻塞。 【自旋等待】try {
Thread.sleep(10);} catch (InterruptedException e) {
e.printStackTrace();}}try {
isRead = true;return list.get(index);} finally {
// 保证为原子操作isRead = false; // 读操作完成}}
}