-----------------------------------------------------------------------------
3. Semaphore
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。”
??? 我们一般用它来控制某个对象的线程访问对象
??? 例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作
使用信号量来模拟实现
具体代码如下(参考 [JCIP])
- import?java.util.Collections; ??
- import?java.util.HashSet; ??
- import?java.util.Set; ??
- import?java.util.concurrent.ExecutorService; ??
- import?java.util.concurrent.Executors; ??
- import?java.util.concurrent.Semaphore; ??
- ??
- public?class?TestSemaphore?{ ??
- ??
- ????public?static?void?main(String[]?args)?{ ??
- ????????ExecutorService?exec?=?Executors.newCachedThreadPool(); ??
- ????????TestSemaphore?t?=?new?TestSemaphore(); ??
- ????????final?BoundedHashSet<String>?set?=?t.getSet(); ??
- ??
- ????????for?(int?i?=?0;?i?<?3;?i++)?{//三个线程同时操作add??
- ????????????exec.execute(new?Runnable()?{ ??
- ????????????????public?void?run()?{ ??
- ????????????????????try?{ ??
- ????????????????????????set.add(Thread.currentThread().getName()); ??
- ????????????????????}?catch?(InterruptedException?e)?{ ??
- ????????????????????????e.printStackTrace(); ??
- ????????????????????} ??
- ????????????????} ??
- ????????????}); ??
- ????????} ??
- ??
- ????????for?(int?j?=?0;?j?<?3;?j++)?{//三个线程同时操作remove??
- ????????????exec.execute(new?Runnable()?{ ??
- ????????????????public?void?run()?{ ??
- ????????????????????set.remove(Thread.currentThread().getName()); ??
- ????????????????} ??
- ????????????}); ??
- ????????} ??
- ????????exec.shutdown(); ??
- ????} ??
- ??
- ????public?BoundedHashSet<String>?getSet()?{ ??
- ????????return?new?BoundedHashSet<String>(2);//定义一个边界约束为2的线程??
- ????} ??
- ??
- ????class?BoundedHashSet<T>?{ ??
- ????????private?final?Set<T>?set; ??
- ????????private?final?Semaphore?semaphore; ??
- ??
- ????????public?BoundedHashSet(int?bound)?{ ??
- ????????????this.set?=?Collections.synchronizedSet(new?HashSet<T>()); ??
- ????????????this.semaphore?=?new?Semaphore(bound,?true); ??
- ????????} ??
- ??
- ????????public?void?add(T?o)?throws?InterruptedException?{ ??
- ????????????semaphore.acquire();//信号量控制可访问的线程数目??
- ????????????set.add(o); ??
- ????????????System.out.printf("add:%s%n",o); ??
- ????????} ??
- ??
- ????????public?void?remove(T?o)?{ ??
- ????????????if?(set.remove(o)) ??
- ????????????????semaphore.release();//释放掉信号量??
- ????????????System.out.printf("remove:%s%n",o); ??
- ????????} ??
- ????} ??
- }??
- ??
??? 总结:Semaphore通常用于对象池的控制
4.FutureTask
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
??? “取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。
除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “
??? 应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的
具体代码如下(参考 [JCIP])
- import?java.util.concurrent.Callable; ??
- import?java.util.concurrent.ExecutionException; ??
- import?java.util.concurrent.ExecutorService; ??
- import?java.util.concurrent.Executors; ??
- import?java.util.concurrent.FutureTask; ??
- ??
- public?class?TestFutureTask?{ ??
- ??
- ????public?static?void?main(String[]?args)?{ ??
- ????????ExecutorService?exec=Executors.newCachedThreadPool(); ??
- ???????? ??
- ????????FutureTask<String>?task=new?FutureTask<String>(new?Callable<String>(){//FutrueTask的构造参数是一个Callable接口??
- ????????????@Override??
- ????????????public?String?call()?throws?Exception?{ ??
- ????????????????return?Thread.currentThread().getName();//这里可以是一个异步操作??
- ????????????}}); ??
- ???????????? ??
- ????????????try?{ ??
- ????????????????exec.execute(task);//FutureTask实际上也是一个线程??
- ????????????????String?result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待??
- ????????????????System.out.printf("get:%s%n",result); ??
- ????????????}?catch?(InterruptedException?e)?{ ??
- ????????????????e.printStackTrace(); ??
- ????????????}?catch?(ExecutionException?e)?{ ??
- ????????????????e.printStackTrace(); ??
- ????????????} ??
- ????} ??
- ??
- }??
??? 总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写
5. Exchanger
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
??? “可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “
??? 应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换
??? 代码如下(参考了网上给的示例?? http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d)
- import?java.util.ArrayList; ??
- import?java.util.concurrent.Exchanger; ??
- ??
- public?class?TestExchanger?{ ??
- ??
- ????public?static?void?main(String[]?args)?{ ??
- ????????final?Exchanger<ArrayList<Integer>>?exchanger?=?new?Exchanger<ArrayList<Integer>>(); ??
- ????????final?ArrayList<Integer>?buff1?=?new?ArrayList<Integer>(10); ??
- ????????final?ArrayList<Integer>?buff2?=?new?ArrayList<Integer>(10); ??
- ??
- ????????new?Thread(new?Runnable()?{ ??
- ????????????@Override??
- ????????????public?void?run()?{ ??
- ????????????????ArrayList<Integer>?buff?=?buff1; ??
- ????????????????try?{ ??
- ????????????????????while?(true)?{ ??
- ????????????????????????if?(buff.size()?>=?10)?{ ??
- ????????????????????????????buff?=?exchanger.exchange(buff);//开始跟另外一个线程交互数据??
- ????????????????????????????System.out.println("exchange?buff1"); ??
- ????????????????????????????buff.clear(); ??
- ????????????????????????} ??
- ????????????????????????buff.add((int)(Math.random()*100)); ??
- ????????????????????????Thread.sleep((long)(Math.random()*1000)); ??
- ????????????????????} ??
- ????????????????}?catch?(InterruptedException?e)?{ ??
- ????????????????????e.printStackTrace(); ??
- ????????????????} ??
- ????????????} ??
- ????????}).start(); ??
- ???????? ??
- ????????new?Thread(new?Runnable(){ ??
- ????????????@Override??
- ????????????public?void?run()?{ ??
- ????????????????ArrayList<Integer>?buff=buff2; ??
- ????????????????while(true){ ??
- ????????????????????try?{ ??
- ????????????????????????for(Integer?i:buff){ ??
- ????????????????????????????System.out.println(i); ??
- ????????????????????????} ??
- ????????????????????????Thread.sleep(1000); ??
- ????????????????????????buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据??
- ????????????????????????System.out.println("exchange?buff2"); ??
- ????????????????????}?catch?(InterruptedException?e)?{ ??
- ????????????????????????e.printStackTrace(); ??
- ????????????????????} ??
- ????????????????} ??
- ????????????}}).start(); ??
- ????} ??
- }??
??? 总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互)