参考链接:
JAVA中J.U.C 包下并发类的应用
文章目录
-
- Lock 独占锁
-
- ReentrantLock 实现类
- Condition 条件变量
- ReadWriteLock 读写锁
-
- ReentrantReadWriteLock 实现类
- StampedLock 邮戳锁
- Atomic 原子类
-
- 原子对象原理分析
- 原子对象存在的问题
- 并发工具类
-
- CountDownLatch类
- CyclicBarrier类
- Semaphore类
- 线程池
-
- ThreadPoolExecutor类
- ScheduledThreadPoolExecutor类
Lock 独占锁
主要用来解决互斥问题。
接口方法:
-
void lock()
—— 获取锁对象,优先考虑是锁的获取,而非中断。 -
void lockInterruptibly()
—— 获取锁,但优先响应中断而非锁的获取。 -
boolean tryLock()
——试图获取锁,如果返回true,则获取成功,直接使用。不需要继续lock() -
boolean tryLock(long timeout, TimeUnit timeUnit)
——试图获取锁,并设置等待时长。 -
void unlock()
——释放锁对象
ReentrantLock 实现类
特性:
互斥——每次只能由一个线程进入,悲观锁的特性。
公平——在等待序列中,谁先来的谁先获取。sync是非公平的,由OS调度。
注意事项:
——由于Lock必须手动释放锁
,sync是自动释放的。所以我们必须写try catch finally代码块,进行锁的释放。
——为了使用其可公平的特性,我们可以在构造方法中传fair:true
,用来构建公平锁。
——也可以在tryLock()
时,设置超时时间。
我们写一个简易的代码来说明这几点:
RentrantLock lock=new ReentrantLock(true);//设置为公平锁public void test1(){
boolean b=lock.tryLock(1,TimeUnit.SECONDS);//尝试获取,等待1秒if(b){
try{
//同步代码}finally{
lock.unLock();//释放锁}}}
Condition 条件变量
J.U.C包提供的Condition接口,用以对原生的Object.wait()
、Object.notify()
进行增强。我们可以借助Condition对象,然后基于锁实现线程之间的通讯
。
接口方法:
await()
——对标wait()
signal()
——对标notify()
signalAll()
——对标notifyAll()
Condition的强大之处在于它可以为多个线程间建立不同的Condition。
await()
被唤醒的四个条件:.
1.其他线程使用signal()
唤醒了这个线程。
2.当前线程恰好被选中唤醒。
3…其他线程使用signalAll()
唤醒了这个线程。
4.Some other thread {@linkplain Thread#interrupt interrupts} the current thread, and interruption of thread suspension is supported; or spurious wakeup; occurs.(此处存疑,无法准确翻译,自行理解。)
那么,我们可以尝试写一个阻塞式的栈了。
class BlockingStack{
Stack<String> stack = new Stack<>();int CAPACITY = 5;ReentrantLock lock = new ReentrantLock();Condition stackEmptyCondition = lock.newCondition();Condition stackFullCondition = lock.newCondition();public void pushToStack(String item){
//这是一个写入方法try {
lock.lock();//获得锁对象while(stack.size() == CAPACITY) {
stackFullCondition.await();//将当前的写线程沉睡并释放锁。并且,只能用stackFullCondition对象的signal()唤醒。}stack.push(item);//被popFromStack()方法取出后唤醒stackEmptyCondition.signalAll();//此时,popFromStack()中可能有被沉睡的,对读线程唤醒} finally {
lock.unlock();//释放锁对象}}public String popFromStack() {
try {
lock.lock();while(stack.size() == 0) {
stackEmptyCondition.await();//读线程被阻塞,只有stackEmptyCondition对象执行signal()}return stack.pop();//被写线程唤醒后,继续执行} finally {
stackFullCondition.signalAll();//此处唤醒所有写线程lock.unlock();}}
}
在这个案例中,我们使用两个Condition对象。
可以发现,stackEmptyCondition对象执行唤醒时,只能唤醒stackEmptyCondition对象沉睡的线程。
这样的设计非常灵活的标记了线程,让我们有了很多操作空间。
使我们可以在别的线程中对其他特定的线程进行沉睡和唤醒。以达成线程的通讯。
ReadWriteLock 读写锁
ReadWriteLock是一个是读写锁接口,其中“读锁”又称“共享锁”,能同时被多个线程获取。“写锁”又称独占锁,只能被一个线程获取。
读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许的,这是读写锁在读多写少
的场景下性能优于互斥锁的关键。
接口方法:
Lock readLock()
——返回一个读锁
Lock writeLock()
——返回一个写锁
ReentrantReadWriteLock 实现类
那么,很多同学会有疑问,既然读不会被限制,是不是只用写锁就够了呢?反正读操作允许多个线程。
我先了解一下读写锁的特点
读写锁特点:
1.ReentrantReadWriteLock可以让多个读线程可以同时持有读锁(只要写锁未被占用)。
2.一对读锁和写锁,是从一个readWriteLock对象获得的。他们是具有关联性的。
不同的readWriteLock对象获得的读锁和写锁之间是无法正常使用的。
3.ReadWriteLock中的锁不支持升级操作——读锁内套写锁,这样会导致写锁永久等待。
4.ReadWriteLock中的锁虽不支持升级操作,但支持降级操作——写锁内套读锁。
根据读写锁的特点,我们大致可以看到这样一种情况:
在进行写操作时,读锁是不允许多线程的,这样做是为了保护数据安全。
那么如果不加读锁,在写入的时候,读操作就会访问到写操作的中间值,或者前面的线程读到旧值,后面的线程读到新值。
因此,读锁的存在,是为了在写锁被占用时,对读操作进行限制,以获得正确的数据。
效率问题
1.由于写锁是独占的,当大量的出现写操作的时候,此时的读锁并不能发挥出很好的性能,因为写锁总是被占用。
读锁只能提供一丢丢性能的提升,甚至效率降低。
2.由于读写锁本身也是一套逻辑,需要运算。而且锁的粒度本身也很小。
如果写操作本身跑的就很快,可以快速的完成写入。那么使用读写锁有时候还不如单纯的使用互斥锁。
饥饿问题
在非公平策略下,线程是由OS调度的。
如果读的操作非常大量,而写的线程很少,很可能会出现写线程很难被选中的情况。
甚至永远无法选中,直到饿死。(等太久导致写入已经失去意义)
当多个线程同时读的时候,所有的写操作会被阻塞。
class MapCache{
private Map<String,Object> map=new HashMap<>();private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();public void write(String key,Object value){
Lock lock = readWriteLock.writeLock();//获取一个写锁,独占锁,只允许一个线程进入。try {
lock.lock();map.put(key,value);}finally {
lock.unlock();}}public Object read(String key){
Lock lock = readWriteLock.readLock();//获得一个读锁,允许多个线程进入。try {
lock.lock();return map.get(key);}finally {
lock.unlock();}}
}
StampedLock 邮戳锁
该类是对ReadWriteLock的增强。
优化了读锁、写锁,并使它们可以相互转换,实现了更加细粒度的并发控制。
三种模式:
写锁:加锁成功返回一个stamp,解锁用。
悲观读:加锁成功返回一个stamp,解锁用。
乐观读:乐观读的时候,允许一个写操作获得写锁。
注意事项:
- 所有获取锁的方法,都返回一个邮戳(Stamp),Stamp为0表示获取失败,其余都表示成功。
- 释放锁时需要一个Stamp值,这个值必须是和成功获取锁时得到的Stamp值是一致的。
- StampedLock是不可重入的。(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
- StampedLock支持读锁和写锁的相互转换。
- 无论写锁还是读锁,都不支持Conditon等待。
我们尝试写一个线程安全的Cache类
class StampedMapCache{
private Map<String,Object> map=new HashMap<>();StampedLock stampedLock=new StampedLock();//创建一个锁对象public void writeKeyObject(String key,Object value){
long stamp = stampedLock.writeLock();//获得写锁try {
map.put(key,value);}finally {
stampedLock.unlockWrite(stamp);//用邮戳解锁}}public Object readObject(String key){
long stamp = stampedLock.readLock();//获得读锁try {
Object o = map.get(key);return o;}finally {
stampedLock.unlockWrite(stamp);//用邮戳解锁}}public Object readOptimisticObject(String key){
long stamp = stampedLock.tryOptimisticRead();//获得乐观读try {
map.get(key);}finally {
stampedLock.unlock(stamp);//乐观读解锁}return null;}
}
总结:
相比ReadWrite Lock读写锁,StampedLock通过提供乐观读在读线程多,写线程少的情况下可以提供更好的性能,因为乐观读不需要进行CAS设置锁的状态。
Java中的锁对象的最佳应用设置推荐?
1.更新对象的成员变量时加锁。
2.访问可变的成员变量时枷锁。
3.不在调用其他对象的方法时加锁。
Atomic 原子类
思考:有锁编程可以保证线程的安全,实际是保证了操作的原子性,但是会损耗一部分性能。那么有没有一种方法,可以不加锁又能保证线程的安全呢?
其实对于简单的原子性问题,还有一种无锁方案。JUC并发包将这种无锁方案封装提炼之后,实现了一系列的原子类。
比如,我们写一个原子性的long类型的原子类。
源子类提供了丰富的API实现一些运算。
AtomicLong al=new AtomicLong(1000);//创建原子long并赋值为1000public long atomicCount(){
return al.getAndAdd(100);//+100并返回}
在使用原子类时,getAndAdd(100)
这个操作意为原子性的,相当于为它加了一个小锁,但是比使用互斥锁的性能优秀很多,我们下面再谈。
//AtomicLong atomicLong=new AtomicLong(0);
// LongStream.range(0, 1000)
// .parallel()
// .forEach((t)->atomicLong.incrementAndGet());
// System.out.println(atomicLong.get());
无锁方案相对互斥锁方案,最大的好处就是性能。互斥锁方案为了保证互斥性,需要执行加锁、 解锁操作,而加锁、解锁操作本身就消耗性能。同时拿不到锁的线程还会进入阻塞状态,进而触发线程切换,线程切换对性能的消耗也很大。 相比之下,无锁方案则完全没有加锁、解锁的性能消耗,可谓绝佳方案。
原子对象原理分析
CPU 为了解决并发问题,提供了 CAS 指令(CAS,全称是 Compare And Swap,即“比较并交换”)。
首先线程读取到旧的A并进行写入操作,写入A——>C,写完后在提交之前
重新读取A的值,看是否为原来的A,如果是,则提交。如果否,则重新读取继续写入。
我们可以看一下incrementAndGet()
方法的实现
public final int incrementAndGet() {
for (;;) {
int current = get(); //取得当前的值int next = current + 1; //计算加1后的值if (compareAndSet(current, next))//这里有2个参数,current为读取的值,如果这个值没有改变,则改为next。return next;}}
说明:原子类的方法都是针对一个共享变量的,如果你需要解决多个变量的原子性问题,建议还是使用互斥锁方案。
原子对象存在的问题
Java中的无锁方案相对于互斥锁方案,优点非常多,首先性能好,其次是基本不会出现死锁问题,但也会有一些问题,例如:
——ABA问题:
这里我们模拟一个共享变量 int a=0;
线程1:读到a=0————————————————写入a=100成功
记录读到0 检查是否为0
线程2:读到a=0————写入a=1成功
线程3:————————————读到a=1————写入a=0成功
我们可以看到,线程1虽然成功写入100。但这是一次无效的修改,在这期间另外两个线程已经改过了多次a的值。
解决:
解决这个问题的方法很简单,记录一下变量的版本就可以了,在变量的值发生变化时,对应的版本也做出相应的变化,然后CAS操作时比较一下版本就知道变量有没有发生变化。此时可借助在java的atomic包下的AtomicStampedReference类
进行实现。
——自旋问题:
无锁对象会多次尝试CAS操作直至成功或失败,这个过程叫做自旋。正如我们上面的代码中的无线循环一样。
在这个过程中,它是不断运转的,可以避免上下文切换。
但是,永久的无线循环无疑是对CPU的极大浪费,因此自旋一般都会有个次数限制,即超过这个次数后线程就会放弃时间片,等待下次机会。
结论:
在资源竞争不激烈的情况下,自旋 确实能提高效率(避免切换上下文)。
但是在资源竞争特别激烈的场景中,CAS操作失败率就会大大提高。这种情况下,使用重量级锁
效率可能会更高。
当然,也可以使用LongAdder类
来替换,它则采用了分段锁的思想来解决并发竞争的问题。
并发工具类
CountDownLatch类
CountDownLatch是一个辅助同步类,用来作计数使用,它的作用有点类似于生活中的倒数计数器
,先设定一个计数初始值,当计数降到0时,将会触发一些事件。
现在,参考下列代码:
class CountDown{
static String content;@Testpublic void testCountDown() throws InterruptedException {
CountDownLatch count = new CountDownLatch(1);//构造时传入从几开始倒数new Thread(new Runnable() {
@SneakyThrows@Overridepublic void run() {
content="helloworld";//写入contentThread.sleep(5000);count.countDown();//计数1次}}).start();while (content==null){
count.await();}//如果content没有值,当前线程沉睡。System.out.println(content.toUpperCase());}
}
输出结果为:
副线程在写入conten后,休息了5秒。
在这期间,主线程并没有因为content==null
条件已经改变而输出结果。
而是等到副线程将主线程唤醒后,才输出了content。
但是问题来了,是因为count.countDown()
唤醒了线程吗?并不是,如果我们把new CountDownLatch(1)
中的1改为2。则会发现主线程会一直阻塞下去。
因此,我们可以得到它的一些特点:
- CountDownLatch对象的初始计数值,在构造CountDownLatch对象时传入。
- 每调用一次 countDown() 方法,计数值就会减1。
- 线程可以调用CountDownLatch的await方法进入阻塞。
- 当计数值降到0时,所有之前调用await阻塞的线程都会释放。
原理图如下:
说明:
CountDownLatch的初始计数值一旦降到0,无法重置。如果需要重置,可以考虑使用CyclicBarrier类
。
CyclicBarrier类
与CountDownLatch不同,CyclicBarrier可以循环使用。
它做的事情就是:让线程到达栅栏时被阻塞(调用await方法),直到到达栅栏的线程数满足指定数量要求时,栅栏才会打开放行。
这个应用,其实有点像军训报数,报数总人数满足教官认为的总数时,教官才会安排后面的训练。
public class JUC {
static CyclicBarrier cyclicBarrier=new CyclicBarrier(3, new Runnable() {
@Overridepublic void run() {
System.out.println("点到完毕");//三个线程进入阻塞。则唤醒}});static class SumTask implements Runnable{
@Overridepublic void run() {
try {
String name = Thread.currentThread().getName();System.out.println("开始执行复杂任务:"+name);
// Thread.sleep(2000);//模拟正在完成复杂任务。System.out.println("执行完成:"+name);cyclicBarrier.await();//执行完成任务,开始阻塞。}catch (Exception e){
e.printStackTrace();}}}public static void main(String[] args) {
SumTask task = new SumTask();for (int i = 0; i < 100; i++) {
new Thread(task).start();//新建一个线程执行SumTask.run方法//不停阻塞这个新建的线程。//直到线程数达到三个。唤醒所有线程}}
}
CyclicBarrier典型应用是一组任务,它们并行执行工作,然后在进行下一个步骤之前进行等待,直至所有的任务都完成。
Semaphore类
Semaphore,又名信号量
,这个类的作用有点类似于“许可证”。
我们因为一些原因需要控制同时访问共享资源的最大线程数量,比如出于系统性能的考虑需要限流,或者共享资源是稀缺资源,我们需要有一种办法能够协调各个线程,以保证合理的使用公共资源。
Semaphore维护了一个许可集
,其实就是一定数量的“许可证”。
Semaphore支持公平/非公平策略,这和ReentrantLock类似。
基本原理:
当有线程想要访问共享资源时,需要先获取(acquire)的许可(限量的);
如果许可不够了,线程需要一直等待,直到许可可用。
当线程使用完共享资源后,可以归还(release)许可给许可集,以供其它需要的线程使用。
现在我们尝试基于Semaphore实现一个简易的限流操作。
class CurrentLimitingService{
final Semaphore semaphore=new Semaphore(3,true);//true是公平策略,permits是初始许可数量————我们只配置了3个许可证。public void process(){
try {
semaphore.acquire();//线程在这里获取许可证String name = Thread.currentThread().getName();//获得许可的线程名System.out.println("获得许可的线程为:"+name);try {
Thread.sleep(2000);//模拟2秒的处理过程System.out.println(name+"已完成工作");}catch (Exception e){
e.printStackTrace();}}catch (Exception e){
e.printStackTrace();} finally {
semaphore.release();//归还许可证。}}public static void main(String[] args) {
CurrentLimitingService service = new CurrentLimitingService();for (int i = 0; i < 10; i++) {
//模拟10个线程进行并发new Thread(new Runnable() {
@Overridepublic void run() {
service.process();}}).start();}}
}
我们可以观察输出情况,每次只有三个线程可以进入这个方法。其他线程被阻挡在锁外。
线程池
ava中创建线程对象远不像创建一个普通对象那么简单。创建一般的对象,可能仅仅是在JVM的堆里分配一块内存而已;而创建一个线程,却需要调用操作系统内核的API,然后操作系统要为线程分配一系列的资源,这个创建成本一般会很高,所以可以把线程理解为一个重量级的对象,应该避免频繁创建和销毁。
ThreadPoolExecutor类
线程池的构造函数中提供了全参来让我们配置一些基本配置。
那现在我们可以尝试创建一个线程池:
class ThreadPool{
int corePoolSize=3;//线程池最少保存线程数量int maximumPoolSize=5;//最大数量long keepAliveTime=60;//超出最小数量时,保持不被销毁的时间TimeUnit timeUnit=TimeUnit.SECONDS;//时间单位BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue(3);//指定队列为数组队列ThreadFactory threadFactory=new ThreadFactory() {
//线程创建工厂@Overridepublic Thread newThread(Runnable r) {
return new Thread(r);}};RejectedExecutionHandler handler=new ThreadPoolExecutor.AbortPolicy();//线程池提供了4个内部类作为4种驳回策略,并且这四个内部类都实现了RejectedExecutionHandler//我们可以在这里使用默认的中断策略,当没有线程可用时抛出RejectedExecutionException异常//————————使用构造方法————————ThreadPoolExecutor pool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handler);
}
工作队列详解:
直接提交:
直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue
有界队列:
当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
无界队列:
使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
这里我们不得不提一下四种驳回策略,它们被定义在ThreadPoolExecutor类的内部类。并且实现了RejectedExecutionHandler
接口,我们可以在构造函数里直接构造它们。
1)CallerRunsPolicy
:提交任务的线程自己去执行该任务。
2) AbortPolicy
:默认的拒绝策略,会抛出 RejectedExecutionException异常
3) DiscardPolicy
:直接丢弃任务,没有任何异常抛出。
4) DiscardOldestPolicy
:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入 到工作队列。
线程池的关闭:
那线程池被我们构建出来后,如果关闭呢?应该怎样关闭。
我们可以使用ThreadPoolExecutor提供的方法:
.shutdown()
——将线程池关闭,设置为SHUTWDOWN状态,不在为新任务提供线程。已经提供线程的任务则继续执行。
.awaitTermination()
——等待任务执行完,需要传入一个时间。
shutdownNow()
——将线程池立即关闭,设置为STOP状态,此时不再为新任务提供线程,并尝试终止
并回收正在执行的任务。
并且这几个方法是上了互斥锁的,我们不需要担心重复关闭的问题。
现在我们尝试写一段关闭线程池的代码:
public static void closePool(ThreadPoolExecutor pool){
try{
pool.shutdown();//先停止对外提供线程pool.awaitTermination(5,TimeUnit.SECONDS);//等待其他线程完成工作}catch (InterruptedException e) {
e.printStackTrace();}finally {
if (!pool.isTerminated()){
System.out.println("取消未完成的工作");}pool.shutdownNow();//立即关闭STOP线程池}}
线程池的使用:
这里我们不得不提线程的创建方式,通常我们熟知的线程创建方式有以下几种:
继承Thread并重写run()方法。
实现Runnable并重写run()方法。
但是实现Runnable的方式并不能获得返回值,且无法抛出异常。
这时候我们需要Callable。
callable.call()
是一种可以拿到返回值的实现异步的方法,返回值为Futrue类型。
callable可以取消。
通常,我们需要使用ExecutorService
接口的submit方法调用。
ThreadPoolExecutor—继承—>AbstractExecutorService—实现—>ExecutorService
因此,我们可以使用线程池直接调用submit方法,提交一个异步任务。并且返回值是Futrue<V\>
类型。
而Futrue接口,直译为未来。意为未来的结果,它提供了五个方法,让我们对异步操作的执行结果进行操作:
cancel(boolean)
——取消这个任务,未开始或已完成返回false。
true:中断线程。
false:不中断正在执行的任务,任务正常执行,只能取消未开始的任务。
任务可能存在三个状态:
等待执行: 只要调用cancel(),一定会被取消,不论是true还是false。
执行中: 只要cancel(true)可以取消。
执行完成: cancel()已无效
isCancelled()
——判断是否已取消(未执行完被取消的)
isDone()
——判断是否已结束
get()
——获取结果(阻塞式,直到拿到结果)
get(timeout, unit)
——支持超时机制(阻塞式,超时返回null)
注意: 事实上,我们将Runnable传入线程池也可以获取异步的返回值。
这是因为在FutrueTask接口中,通过Executors类的RunnableAdaptor把两种接口的线程都统一为了Callable。
在RunnableAdapter中,call()方法调用了run()方法。以实现统一。
参考资料:
java Callable 实现原理分析
我们现在模拟一个获取异步结果的代码:
public void testPool() throws ExecutionException, InterruptedException {
ThreadPoolExecutor pool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handler);//创建池对象Future<Integer> future = pool.submit(new Callable<Integer>() {
@Overridepublic Integer call() throws Exception {
Thread.sleep(3000);//模拟一个3秒的复杂工作。return new Random().nextInt();}});System.out.println("是否执行完成:"+future.isDone());Integer re = future.get();//获取结果,如果没执行完,就阻塞本线程。直到FutrueTask中的状态改变,唤醒本线程。System.out.println("是否执行完成:"+future.isDone());System.out.println("执行结果为:"+re);closePool(pool);//关闭线程池}
流程分析:
在本代码中,先通过ThreadPoolExecutor对象调用父类AbstractExecutorService的submit()方法,传入Callble类型的回调方法。
又调用了newTaskFor()方法,创建了一个FutureTask对象,通过构造并传入我们的回调,在构造方法中把
this.callable = callable;
this.state = NEW;
在FutureTask对象中,重写了实现自RunnableFutrue的run();在run()中调用了call();
并且,在AbstractExecutorService中,在创建完FutrueTask对象后,把它传给了execute()方法,并把run()就绪,这里实际上调用的是call()方法。
假如我们代码中当初没有传入callable,而是传入runnable。则会先通过Futrue的构造方法,调用Executors.callable,再调用RunnableAdapter的构造方法将runnable转为callable。call()方法包裹了run();
继续,当主线程调用get()方法时,如果没有结果则会阻塞——FutrueTask中,get()通过调用awaitDone()方法,继续调用LockSupport.park()方法实现阻塞。
在FutrueTask中的run()方法中,通过call()方法获取结果后。run()调用finishCompletion()方法,继续调用LockSupport.unpark(t)方法对我们的主线程实施唤醒。
多任务批量执行:
线程池提供了invokeAll()方法对任务集合进行批量处理。
实例代码如下:
public void testPoll2() throws InterruptedException {
ThreadPoolExecutor pool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handler);//创建池对象List<Callable<String>> callables = Arrays.asList(() -> "task1",() -> "task2",() -> "task3");//通过lambda表达式,直接返回类型为String类型的任务结果。List<Future<String>> futures = pool.invokeAll(callables);Stream<Future<String>> stream = futures.stream();stream.map(future -> {
try {
return future.get();}catch (Exception e) {
throw new IllegalStateException(e);}}).forEach(System.out::println);closePool(pool);}
ScheduledThreadPoolExecutor类
Scheduled翻译为:排定的
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor类。
其内部将所有的Runnable任务包装成RunnableScheduledFuture类型,用于满足任务的延迟和周期性调度。
特点:
- 阻塞队列是无界队列——DelayedWorkQueue
- 使用ScheduledFutureTask作为任务封装类,代替原先的FutureTask类;作为ScheduledFutureTask的内部类,实现了RunnableScheduledFuture,封装了调度任务的执行逻辑。其中的time字段存放下一次执行时间,period字段存放执行周期,对于周期性执行任务,每次会根据period计算time。
- 重写了shutdown()方法,允许移除并且取消不需要在shutdown后执行的任务。
- 提供了decorateTask方法,用来定制任务操作;
构造器参数:
- corePoolSize:线程池核心工作线程数量;
- threadFactory:定制工作线程创建方式;
- handler:驳回任务处理策略;
使用DelayedWorkQueue作为阻塞队列,该队列为无界队列,因而maximumPoolSize属性配置无效。
又因为都是核心工作线程,没有非核心线程需要回收,因而keepAliveTime配置为0。
单次执行:
schedule()可以指定延迟时间
案例一:创建一延迟任务并执行,关键代码如下:
class Main2{
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor pool =new ScheduledThreadPoolExecutor(1);Runnable task = new Runnable() {
@Overridepublic void run() {
System.out.println("执行时间:" + System.nanoTime());}};System.out.println("开始计时:" + System.nanoTime());pool.schedule(task,3,TimeUnit.SECONDS);//延时3秒执行Thread.sleep(1000);pool.shutdownNow();}
}
周期性执行:
通过scheduleAtFixedRate()
、scheduleWithFixedDelay()
方法执行的任务均为周期性执行任务。
周期性执行的实现可以理解为每次执行完成后设定下一次执行时间,然后将任务重新放入到阻塞队列等待下一次调度。
scheduleAtFixedRate() / scheduleWithFixedDelay()执行的主要区别在于设置下一次执行时间的策略不同,而执行时间通过ScheduledFutureTask的time字段保存,通过ScheduledFutureTask#setNextRunTime()进行设置。
案例二:周期执行:
class Main3{
public static void main(String[] args) throws InterruptedException {
ScheduledThreadPoolExecutor pool =new ScheduledThreadPoolExecutor(1);Runnable task = new Runnable() {
@Overridepublic void run() {
System.out.println("执行时间:" + System.nanoTime());}};int initialDelay = 5;//初始延迟时间int period = 1;//执行周期pool.scheduleAtFixedRate(task,initialDelay,period,TimeUnit.SECONDS);}
}