当前位置: 代码迷 >> Exchange >> java.util.concurrent详解(二)Se地图hore/FutureTask/Exchanger
  详细解决方案

java.util.concurrent详解(二)Se地图hore/FutureTask/Exchanger

热度:9590   发布时间:2013-02-26 00:00:00.0
java.util.concurrent详解(二)Semaphore/FutureTask/Exchanger

-----------------------------------------------------------------------------
3. Semaphore
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
“一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。”

??? 我们一般用它来控制某个对象的线程访问对象

??? 例如,对于某个容器,我们规定,最多只能容纳n个线程同时操作
使用信号量来模拟实现


具体代码如下(参考 [JCIP])

Java代码 复制代码?收藏代码
  1. import?java.util.Collections; ??
  2. import?java.util.HashSet; ??
  3. import?java.util.Set; ??
  4. import?java.util.concurrent.ExecutorService; ??
  5. import?java.util.concurrent.Executors; ??
  6. import?java.util.concurrent.Semaphore; ??
  7. ??
  8. public?class?TestSemaphore?{ ??
  9. ??
  10. ????public?static?void?main(String[]?args)?{ ??
  11. ????????ExecutorService?exec?=?Executors.newCachedThreadPool(); ??
  12. ????????TestSemaphore?t?=?new?TestSemaphore(); ??
  13. ????????final?BoundedHashSet<String>?set?=?t.getSet(); ??
  14. ??
  15. ????????for?(int?i?=?0;?i?<?3;?i++)?{//三个线程同时操作add??
  16. ????????????exec.execute(new?Runnable()?{ ??
  17. ????????????????public?void?run()?{ ??
  18. ????????????????????try?{ ??
  19. ????????????????????????set.add(Thread.currentThread().getName()); ??
  20. ????????????????????}?catch?(InterruptedException?e)?{ ??
  21. ????????????????????????e.printStackTrace(); ??
  22. ????????????????????} ??
  23. ????????????????} ??
  24. ????????????}); ??
  25. ????????} ??
  26. ??
  27. ????????for?(int?j?=?0;?j?<?3;?j++)?{//三个线程同时操作remove??
  28. ????????????exec.execute(new?Runnable()?{ ??
  29. ????????????????public?void?run()?{ ??
  30. ????????????????????set.remove(Thread.currentThread().getName()); ??
  31. ????????????????} ??
  32. ????????????}); ??
  33. ????????} ??
  34. ????????exec.shutdown(); ??
  35. ????} ??
  36. ??
  37. ????public?BoundedHashSet<String>?getSet()?{ ??
  38. ????????return?new?BoundedHashSet<String>(2);//定义一个边界约束为2的线程??
  39. ????} ??
  40. ??
  41. ????class?BoundedHashSet<T>?{ ??
  42. ????????private?final?Set<T>?set; ??
  43. ????????private?final?Semaphore?semaphore; ??
  44. ??
  45. ????????public?BoundedHashSet(int?bound)?{ ??
  46. ????????????this.set?=?Collections.synchronizedSet(new?HashSet<T>()); ??
  47. ????????????this.semaphore?=?new?Semaphore(bound,?true); ??
  48. ????????} ??
  49. ??
  50. ????????public?void?add(T?o)?throws?InterruptedException?{ ??
  51. ????????????semaphore.acquire();//信号量控制可访问的线程数目??
  52. ????????????set.add(o); ??
  53. ????????????System.out.printf("add:%s%n",o); ??
  54. ????????} ??
  55. ??
  56. ????????public?void?remove(T?o)?{ ??
  57. ????????????if?(set.remove(o)) ??
  58. ????????????????semaphore.release();//释放掉信号量??
  59. ????????????System.out.printf("remove:%s%n",o); ??
  60. ????????} ??
  61. ????} ??
  62. }??
[java] view plaincopy
  1. ??



??? 总结:Semaphore通常用于对象池的控制

4.FutureTask
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:

??? “取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。
可使用 FutureTask 包装 Callable 或 Runnable 对象。因为 FutureTask 实现了 Runnable,所以可将 FutureTask 提交给 Executor 执行。
除了作为一个独立的类外,此类还提供了 protected 功能,这在创建自定义任务类时可能很有用。 “

??? 应用举例:我们的算法中有一个很耗时的操作,在编程的是,我们希望将它独立成一个模块,调用的时候当做它是立刻返回的,并且可以随时取消的

具体代码如下(参考 [JCIP])

Java代码 复制代码?收藏代码
  1. import?java.util.concurrent.Callable; ??
  2. import?java.util.concurrent.ExecutionException; ??
  3. import?java.util.concurrent.ExecutorService; ??
  4. import?java.util.concurrent.Executors; ??
  5. import?java.util.concurrent.FutureTask; ??
  6. ??
  7. public?class?TestFutureTask?{ ??
  8. ??
  9. ????public?static?void?main(String[]?args)?{ ??
  10. ????????ExecutorService?exec=Executors.newCachedThreadPool(); ??
  11. ???????? ??
  12. ????????FutureTask<String>?task=new?FutureTask<String>(new?Callable<String>(){//FutrueTask的构造参数是一个Callable接口??
  13. ????????????@Override??
  14. ????????????public?String?call()?throws?Exception?{ ??
  15. ????????????????return?Thread.currentThread().getName();//这里可以是一个异步操作??
  16. ????????????}}); ??
  17. ???????????? ??
  18. ????????????try?{ ??
  19. ????????????????exec.execute(task);//FutureTask实际上也是一个线程??
  20. ????????????????String?result=task.get();//取得异步计算的结果,如果没有返回,就会一直阻塞等待??
  21. ????????????????System.out.printf("get:%s%n",result); ??
  22. ????????????}?catch?(InterruptedException?e)?{ ??
  23. ????????????????e.printStackTrace(); ??
  24. ????????????}?catch?(ExecutionException?e)?{ ??
  25. ????????????????e.printStackTrace(); ??
  26. ????????????} ??
  27. ????} ??
  28. ??
  29. }??


??? 总结:FutureTask其实就是新建了一个线程单独执行,使得线程有一个返回值,方便程序的编写

5. Exchanger
??? 我们先来学习一下JDK1.5 API中关于这个类的详细介绍:
??? “可以在pair中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。 “

??? 应用举例:有两个缓存区,两个线程分别向两个缓存区fill和take,当且仅当一个满了,两个缓存区交换

??? 代码如下(参考了网上给的示例?? http://hi.baidu.com/webidea/blog/item/2995e731e53ad5a55fdf0e7d)

Java代码 复制代码?收藏代码
  1. import?java.util.ArrayList; ??
  2. import?java.util.concurrent.Exchanger; ??
  3. ??
  4. public?class?TestExchanger?{ ??
  5. ??
  6. ????public?static?void?main(String[]?args)?{ ??
  7. ????????final?Exchanger<ArrayList<Integer>>?exchanger?=?new?Exchanger<ArrayList<Integer>>(); ??
  8. ????????final?ArrayList<Integer>?buff1?=?new?ArrayList<Integer>(10); ??
  9. ????????final?ArrayList<Integer>?buff2?=?new?ArrayList<Integer>(10); ??
  10. ??
  11. ????????new?Thread(new?Runnable()?{ ??
  12. ????????????@Override??
  13. ????????????public?void?run()?{ ??
  14. ????????????????ArrayList<Integer>?buff?=?buff1; ??
  15. ????????????????try?{ ??
  16. ????????????????????while?(true)?{ ??
  17. ????????????????????????if?(buff.size()?>=?10)?{ ??
  18. ????????????????????????????buff?=?exchanger.exchange(buff);//开始跟另外一个线程交互数据??
  19. ????????????????????????????System.out.println("exchange?buff1"); ??
  20. ????????????????????????????buff.clear(); ??
  21. ????????????????????????} ??
  22. ????????????????????????buff.add((int)(Math.random()*100)); ??
  23. ????????????????????????Thread.sleep((long)(Math.random()*1000)); ??
  24. ????????????????????} ??
  25. ????????????????}?catch?(InterruptedException?e)?{ ??
  26. ????????????????????e.printStackTrace(); ??
  27. ????????????????} ??
  28. ????????????} ??
  29. ????????}).start(); ??
  30. ???????? ??
  31. ????????new?Thread(new?Runnable(){ ??
  32. ????????????@Override??
  33. ????????????public?void?run()?{ ??
  34. ????????????????ArrayList<Integer>?buff=buff2; ??
  35. ????????????????while(true){ ??
  36. ????????????????????try?{ ??
  37. ????????????????????????for(Integer?i:buff){ ??
  38. ????????????????????????????System.out.println(i); ??
  39. ????????????????????????} ??
  40. ????????????????????????Thread.sleep(1000); ??
  41. ????????????????????????buff=exchanger.exchange(buff);//开始跟另外一个线程交换数据??
  42. ????????????????????????System.out.println("exchange?buff2"); ??
  43. ????????????????????}?catch?(InterruptedException?e)?{ ??
  44. ????????????????????????e.printStackTrace(); ??
  45. ????????????????????} ??
  46. ????????????????} ??
  47. ????????????}}).start(); ??
  48. ????} ??
  49. }??



??? 总结:Exchanger在特定的使用场景比较有用(两个伙伴线程之间的数据交互)

  相关解决方案