当前位置: 代码迷 >> 综合 >> Java多线程(十)——Executor
  详细解决方案

Java多线程(十)——Executor

热度:98   发布时间:2023-11-03 05:09:50.0

构建线程优势很花时间,因为他涉及到和操作系统的交互。如果你的程序创建了大量生存时间很短的线程,那么你应该使用线程池。线程池包含很多可以等待运行的线程。你可以将一个Runnable对象传递给线程池。当run方法结束之后,线程不会死亡,而是会等待下一次调用。
另外一个使用线程池的原因是,限定当前一共能运行的线程数量。创建大量线程可能会使系统性能下降,甚至使虚拟机崩溃,如果你有一个程序创建了大量线程,那么你应该使用线程池限定线程的数量。
Executor方法创建了大量的静态工厂方法,用于创建线程池。总结如下:

方法 描述
newCachedThreadPool 根据需要创建线程,线程结束后保留60秒
newFixedThreadPool 创建固定数量线程的线程池,空闲线程无限保留
newSingleThreadExecutor 创建只有一个线程的线程池,他会运行所有提交的任务
newScheduledThreadPool 固定大小的线程池
newSingleThreadScheduledExecutor 单个线程的线程池

线程池

首先我们看看前三个方法,后三个方法在后面再讨论。newCachedThreadPool方法创建一个线程池,这个线程池会马上运行提交的任务,如果有空闲的线程,那么就是用空闲的线程,否则创建一个新的。newFixedThreadPool创建一个固定大小的线程池。如果提交的任务比空闲的线程多,那么多出来的任务将会被放在一个队列中。newSingleThreadExecutor方法返回一个大小为1的固定线程池,只有一个线程在处理任务,所有任务串行执行。这三个方法返回一个实现了ExecutorService方法的ThreadPoolExecutor类。
你可以向ExecutorService提交一个Runnable或者Callable,通过使用下面的方法

Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
Future<T> submit(Callable<T> task);

线程池会在线程空闲的时候执行任务。当你调用submit的时候,你获取得到的Future对象,你可以查询当前的运行状态。
第一个submit方法返回一个奇怪的类型Future<?>,你可以使用这个对象调用isDone, cancel或者isCancelled方法。但是,在完成时,get方法只会返回null。
第二个submit方法也提交一个Runnable,返回结果的get方法返回与result类型一致的类型。
第三个方法输入一个Callable,返回的Future对象在线程结束时获取结果。
当你停止使用线程池时,你应该调用shutdown。这个方法会使线程池销毁。当线程池销毁时,不在接受任务。当线程池中所有已经提交的任务结束时,线程池死亡。作为替代,你可以使用shutdownNow。线程池曲线所有还没有开始的任务,并试图打断正在执行的任务。
总结一下,下面是你使用线程池的步骤:

  1. 调用Executors类的静态方法newCachedThreadPool或者newFixedThreadPool。
  2. 调用submit,提交Runnable或者Callable对象。
  3. 如果你想要取消任务,你需要使用返回的Future对象。
  4. 当你不再需要提交任何任务时,使用shutdown方法

定时执行

ScheduledExecutorService接口可用于定时执行重复的任务。它是对java.util.Timer的一种泛化。newScheduledThreadPool和newSingleThreadScheduledExecutor方法返回实现了ScheduledExecutorService接口的对象。
你可以指定Runnable或Callable延迟一段时间之后,只运行一次,你也可以指定他们重复运行。具体方法查看API。

管理任务组

你已经看到了线程池的高效运作。有时,线程池的作用可能仅仅是管理任务。比如,你可以使用shutdownNow方法使所有线程停止。
invokeAny方法一个Callable对象的集合,然后返回一个完整的任务。你没有办法知道哪个任务是最先完成的。你可以使用这个方法随机地从所有线程中取得一个结果。比如,你想在一堆数字中查找到自己想要的,那么只要有一个找到了,那么其他所有线程就可以停止了。
invokeAll方法同样提交一个Callable对象的集合,但是返回一个Future列表,你可以获取所有任务的结果。

List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result: results)processFurther(result.get());

这种方法的缺点时,如果第一个线程耗时很久,那么你要等很长时间才能开始处理。更合理的办法是,按照线程返回的时间顺序处理结果。这就是ExecutorCompletionService出现的原因。
创建一个线程池,然后构造一个ExecutorCompletionService对象,提交所有任务,对象会自动管理一个阻塞队列,用于接收结果,然后你就可以写出更高效的代码:

ExecutorCompletionService service = new ExecutorCompletionService(executor);
for(Callable<T> task: tasks) service.submit(task);
for(int i = 0;i < tasks.size();i++)processFurther(service.taks().get());

同步器

java.util.concurrent包包含了一些有用的同步器,用于同步多个线程,使他们有序执行。如果你有下面的使用场景,你就可以直接使用他们,而不用手动设计锁和条件对象。

它干了什么 啥时候用
CyclicBarrier 在指定数量的线程达到某个地方之前,阻塞先达到的线程,条件满足之后,再做其他选择 如果你需要等待一个或多个线程的结果
CountDownLatch 在计数器减为0之前,阻塞线程 当一个事件需要发生固定次数,其他线程才能运行时
Exchanger 当两个线程同时满足交换条件时,交换某个变量的值 当两个线程处理同一种结构的数据,而又需要交换信息时
Semaphore 在条件允许之前,阻塞线程 限制获取同一资源的线程数量,如果条件是1,那么就是一个线程等待另外一个线程的允许才能继续执行
SynchronousQueue 允许一个线程向另外一个线程提交对象 将一个对象从一个线程发送到另外一个线程,不需要额外的显式同步操作

信号量

概念上说,信号量管理的是一系列通行证。为了通过信号量,一个线程需要调用acquire方法。只有当指定的通行许可达到时,才允许通行。其他线程可以通过release方法发布许可。没有许可对象,信号量只保留一个计数器。许可可以不由需要通过它的线程发布,事实上,任何线程可以发布任意数量的许可。如果发布的许可数量大于最大值,那么信号量就会当做最大值处理。这种方式使得信号量非常灵活,而又非常难以理解。
在任何一本操作系统的书中,你都可以找到信号量的描述,他们可以实现多种多样的控制技巧。我们建议只有在非常适合使用信号量的场景下,你才应该使用他。
信号量一个最简单的用途就是将计数变量设置为1,这样,你就可以使用一个线程控制另外一个线程的运行和等待。

自旋锁

CountDownLatch可以在他计数达到0之前,阻塞线程。自旋锁是一次性的,你只能减它,不能加它。
一个有用的特殊场景是自旋锁设置为1,这样你就可以制造一个一次性门,线程在另外一个线程操作自旋锁之前,保持阻塞。
想想以下场景。工作线程在等待数据初始化,当负责数据初始化的线程完成工作之后,将自旋锁减1,此时工作线程收到信号,就可以开始读取数据了。
你也可以使用自旋锁收集数据。将自旋锁的值设置为线程数量,然后在线程技术之前,让自旋锁减1,这样当自旋锁减到0时,你就可以收集数据了。

障碍物

CyclicBarrier实现了一个叫障碍物的对象。想象一堆线程在负责某个计算的每一部分。当所有计算结束后,结果需要被汇总。当一个线程结束后,我们使用障碍物。当所有线程到达障碍物之后,障碍物释放,所有线程运行。
下面是细节。首先,构造一个障碍物,给定参与障碍物的线程数量。

CyclicBarrier barrier = new CyclicBarrier(nthreads);

每个线程进行计算,然后调用障碍物的await方法

public void run()
{
    doWork();barrier.await();
}

await方法可以接受一个超时参数:

barrier.await(100,TimeUnit.MULLISECONDS);

只要有一个线程突破了障碍物,那么障碍物自动消失(突破有两种可能,一种是所有线程达到了障碍物,一种是超时)。在这种情况下,其他所有await方法抛出BrokenBarrierException方法,正在等待的线程将会退出await方法。
当所有线程到达障碍物之后,你可以指定一种行为。

Runnable barrierAction = ...;
CyclicBarrier barrier = new CyclicBarrier(nthreads,barrierAction);

这个行为可能是想等待一个最终的结果。
另外,cyclicBarrier是可以重复使用的,当所有线程通过障碍物之后,你可以重新使用他们。

交换器

通常用于一个线程填充缓冲区,另外一个现场使用数据。当所有工作完成后,他们交换缓冲区。

同步队列

同步队列是为了配对生产者线程和消费者线程。当一个线程调用SynchronousQueue的put方法时,直到另外一个线程调用take方法之前,他都处于阻塞状态。反之亦然。和Exchanger不同的是,数据传输是单向的,从生产者到消费者。
虽然SynchronousQueue类实现了BlockingQueue接口,他不是真正意义上的队列。他不包含任何元素,size方法永远返回0.

  相关解决方案