缓存设置与读取,系统级别的任务计算。
经测试,采用这种方法设置缓存比使用monitor锁设置缓存快2-3倍,比不采取并发控制设置缓存就不比较了。
本示例使用memcached作为示例,本机缓存及其他缓存类似,直接上代码:
package concorrency; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import net.spy.memcached.MemcachedClient; /** * 设置与读取缓存的最佳方法 * @author donlianli@126.com */ public class DistributeCacheExample { private final ConcurrentMap<String, FutureTask<Object>> cache = new ConcurrentHashMap<String, FutureTask<Object>>(); private MemcachedClient memCachedClient ; private Vector<Long> useTimes = new Vector<Long>(); DistributeCacheExample(){ try { setMemCachedClient(new MemcachedClient( new InetSocketAddress("192.168.1.106", 11211))); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] argvs) throws InterruptedException, ExecutionException{ final DistributeCacheExample example = new DistributeCacheExample(); Thread[] threads = new Thread[50]; example.getMemCachedClient().flush().get(); for(int i=0;i<50;i++){ threads[i] = new Thread(new Runnable(){ public void run() { example.aService(1, 2); } }); threads[i].start(); } for(int i=0;i<50;i++){ try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } long totalTime = 0; for(Long u : example.getUseTimes()){ totalTime += u; } System.out.println("totalTime:\t" + totalTime); example.getUseTimes().clear(); example.getMemCachedClient().flush().get(); for(int i=0;i<50;i++){ threads[i] = new Thread(new Runnable(){ public void run() { example.bService(1, 2); } }); threads[i].start(); } for(int i=0;i<50;i++){ try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } totalTime = 0; for(Long u : example.getUseTimes()){ totalTime += u; } System.out.println("syn totalTime:\t" + totalTime); } /** * 服务方法 * 使用了并发包来控制并发 */ @SuppressWarnings("unchecked") public List<Long> aService(int para1,int param2){ long beginTime = System.nanoTime(); final String cacheKey = "IamKey"; List<Long> list = (List<Long>)getMemCachedClient().get(cacheKey); if(list == null){ // System.out.println("1.缓存未命中,将查询数据库或者调用远程服务"); //位命中缓存,开始计算 FutureTask<Object> f = cache.get(cacheKey); if (f == null) { Callable<Object> eval = new Callable<Object>() { public Object call() throws InterruptedException { // System.out.println(" go to dao or rmi"); List<Long> list = new ArrayList<Long>(); list.add(1l);list.add(2l); //将计算结果缓存 // System.out.println("结果计算完毕,存入分布式缓存中"); // Thread.sleep(Math.round(Math.random()*100)); getMemCachedClient().set(cacheKey, 5*60, list); //计算结果,通常是访问数据库或者远程服务 return list; } }; FutureTask<Object> ft = new FutureTask<Object>(eval); f = cache.putIfAbsent(cacheKey, ft); if (f == null) { // System.out.println("2.任务未命中,将查询数据库或者调用远程服务"); f = ft; ft.run(); } } else { // System.out.println("2.任务命中,直接从缓存取结果"); } try { List<Long> result =(List<Long>) f.get(); // System.out.println("取回的结果result:"+result); long end = System.nanoTime(); useTimes.add(end-beginTime); return result; } catch (Exception e) { e.printStackTrace(); } finally{ //最后将计算任务去掉,虽然已经移除任务对象,但其他线程 //仍然能够获取到计算的结果,直到所有引用都失效,被垃圾回收掉 boolean success = cache.remove(cacheKey,f); // System.out.println(success); } return null; } else { // System.out.println("1.缓存命中,直接返回"); long end = System.nanoTime(); useTimes.add(end-beginTime); return list; } } /** * 服务方法 * 使用monitor来控制并发访问 * @param para1 * @param param2 * @return */ public List<Long> bService(int para1,int param2){ long beginTime = System.nanoTime(); final String cacheKey = "IamKey"; List<Long> list = (List<Long>)getMemCachedClient().get(cacheKey); if(list == null){ // System.out.println("1.缓存未命中,将查询数据库或者调用远程服务"); synchronized(cacheKey.intern()){ List<Long> innerList = (List<Long>)getMemCachedClient().get(cacheKey); if(innerList == null){ // System.out.println(" go to dao or rmi"); List<Long> resultlist = new ArrayList<Long>(); resultlist.add(1l);resultlist.add(2l); //将计算结果缓存 // System.out.println("结果计算完毕,存入分布式缓存中"); // Thread.sleep(Math.round(Math.random()*100)); getMemCachedClient().set(cacheKey, 5*60, resultlist); long end = System.nanoTime(); useTimes.add(end-beginTime); return resultlist; } else { long end = System.nanoTime(); useTimes.add(end-beginTime); return list; } } } else { // System.out.println("1.缓存命中,直接返回"); long end = System.nanoTime(); useTimes.add(end-beginTime); return list; } } public MemcachedClient getMemCachedClient() { return memCachedClient; } public void setMemCachedClient(MemcachedClient memCachedClient) { this.memCachedClient = memCachedClient; } public Vector<Long> getUseTimes() { return useTimes; } public void setUseTimes(Vector<Long> useTimes) { this.useTimes = useTimes; } }
以上实现方法中的aService是推荐使用的方法,bService也是一种防止并发的方法,但由于使用了monitor,其性能不如a方法。
但告诉也能看出纰漏来,aService方法也做不到完全的防并发,在从memcached取出的结果再进行是否为null时,与上面的get方法不能组成原子操作,故aService方法,也可能导致重复多余的访问数据库。