当前位置: 代码迷 >> Java相关 >> 使用memcached设置与读取缓存的最佳实践
  详细解决方案

使用memcached设置与读取缓存的最佳实践

热度:2503   发布时间:2013-12-09 22:01:11.0

缓存设置与读取,系统级别的任务计算。

经测试,采用这种方法设置缓存比使用monitor锁设置缓存快2-3倍,比不采取并发控制设置缓存就不比较了。

本示例使用memcached作为示例,本机缓存及其他缓存类似,直接上代码:

package concorrency; 
         
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.Vector; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.FutureTask; 
         
import net.spy.memcached.MemcachedClient; 
/** 
 * 设置与读取缓存的最佳方法 
 * @author donlianli@126.com 
 */
public class DistributeCacheExample { 
    private final ConcurrentMap<String, FutureTask<Object>> cache = new ConcurrentHashMap<String, FutureTask<Object>>(); 
    private MemcachedClient memCachedClient ; 
    private Vector<Long> useTimes = new Vector<Long>(); 
    DistributeCacheExample(){ 
        try { 
            setMemCachedClient(new MemcachedClient( 
                    new InetSocketAddress("192.168.1.106", 11211))); 
        } catch (IOException e) { 
            e.printStackTrace(); 
        } 
    } 
    public static void main(String[] argvs) throws InterruptedException, ExecutionException{ 
        final DistributeCacheExample example = new DistributeCacheExample(); 
        Thread[] threads = new Thread[50]; 
        example.getMemCachedClient().flush().get(); 
        for(int i=0;i<50;i++){ 
            threads[i] = new Thread(new Runnable(){ 
                public void run() { 
                    example.aService(1, 2); 
                } 
                         
            }); 
            threads[i].start(); 
        } 
        for(int i=0;i<50;i++){ 
            try { 
                threads[i].join(); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
        long totalTime = 0; 
        for(Long u : example.getUseTimes()){ 
            totalTime += u; 
        } 
        System.out.println("totalTime:\t" + totalTime); 
        example.getUseTimes().clear(); 
        example.getMemCachedClient().flush().get(); 
        for(int i=0;i<50;i++){ 
            threads[i] = new Thread(new Runnable(){ 
                public void run() { 
                    example.bService(1, 2); 
                } 
            }); 
            threads[i].start(); 
        } 
        for(int i=0;i<50;i++){ 
            try { 
                threads[i].join(); 
            } catch (InterruptedException e) { 
                e.printStackTrace(); 
            } 
        } 
        totalTime = 0; 
        for(Long u : example.getUseTimes()){ 
            totalTime += u; 
        } 
        System.out.println("syn totalTime:\t" + totalTime); 
    } 
    /** 
     * 服务方法 
     * 使用了并发包来控制并发 
     */
    @SuppressWarnings("unchecked") 
    public List<Long> aService(int para1,int param2){ 
        long beginTime = System.nanoTime(); 
        final String cacheKey = "IamKey"; 
        List<Long> list = (List<Long>)getMemCachedClient().get(cacheKey); 
        if(list == null){ 
//          System.out.println("1.缓存未命中,将查询数据库或者调用远程服务"); 
            //位命中缓存,开始计算 
            FutureTask<Object> f = cache.get(cacheKey); 
            if (f == null) { 
                Callable<Object> eval = new Callable<Object>() { 
                    public Object call() throws InterruptedException { 
//                      System.out.println(" go to dao or rmi"); 
                        List<Long> list = new ArrayList<Long>(); 
                        list.add(1l);list.add(2l); 
                        //将计算结果缓存 
//                      System.out.println("结果计算完毕,存入分布式缓存中"); 
//                      Thread.sleep(Math.round(Math.random()*100)); 
                        getMemCachedClient().set(cacheKey, 5*60, list); 
                        //计算结果,通常是访问数据库或者远程服务 
                        return list; 
                    } 
                }; 
                FutureTask<Object> ft = new FutureTask<Object>(eval); 
                f = cache.putIfAbsent(cacheKey, ft); 
                if (f == null) { 
//                  System.out.println("2.任务未命中,将查询数据库或者调用远程服务"); 
                    f = ft; 
                    ft.run(); 
                } 
            } 
            else { 
//              System.out.println("2.任务命中,直接从缓存取结果"); 
            } 
            try { 
                List<Long> result =(List<Long>) f.get(); 
//              System.out.println("取回的结果result:"+result); 
                long end = System.nanoTime(); 
                useTimes.add(end-beginTime); 
                return result; 
            } catch (Exception e) { 
                e.printStackTrace(); 
            } 
            finally{ 
                //最后将计算任务去掉,虽然已经移除任务对象,但其他线程 
                //仍然能够获取到计算的结果,直到所有引用都失效,被垃圾回收掉 
                boolean success = cache.remove(cacheKey,f); 
//              System.out.println(success); 
            } 
            return null; 
        } 
        else { 
//          System.out.println("1.缓存命中,直接返回"); 
            long end = System.nanoTime(); 
            useTimes.add(end-beginTime); 
            return list; 
        } 
    } 
    /** 
     * 服务方法 
     * 使用monitor来控制并发访问 
     * @param para1 
     * @param param2 
     * @return 
     */
    public List<Long> bService(int para1,int param2){ 
        long beginTime = System.nanoTime(); 
        final String cacheKey = "IamKey"; 
        List<Long> list = (List<Long>)getMemCachedClient().get(cacheKey); 
        if(list == null){ 
//          System.out.println("1.缓存未命中,将查询数据库或者调用远程服务"); 
            synchronized(cacheKey.intern()){ 
                List<Long> innerList = (List<Long>)getMemCachedClient().get(cacheKey); 
                if(innerList == null){ 
//                  System.out.println(" go to dao or rmi"); 
                    List<Long> resultlist = new ArrayList<Long>(); 
                    resultlist.add(1l);resultlist.add(2l); 
                    //将计算结果缓存 
//                  System.out.println("结果计算完毕,存入分布式缓存中"); 
//                  Thread.sleep(Math.round(Math.random()*100)); 
                    getMemCachedClient().set(cacheKey, 5*60, resultlist); 
                    long end = System.nanoTime(); 
                    useTimes.add(end-beginTime); 
                    return resultlist; 
                } 
                else { 
                    long end = System.nanoTime(); 
                    useTimes.add(end-beginTime); 
                    return list; 
                } 
            } 
        } 
        else { 
//          System.out.println("1.缓存命中,直接返回"); 
            long end = System.nanoTime(); 
            useTimes.add(end-beginTime); 
            return list; 
        } 
    } 
    public MemcachedClient getMemCachedClient() { 
        return memCachedClient; 
    } 
    public void setMemCachedClient(MemcachedClient memCachedClient) { 
        this.memCachedClient = memCachedClient; 
    } 
    public Vector<Long> getUseTimes() { 
        return useTimes; 
    } 
    public void setUseTimes(Vector<Long> useTimes) { 
        this.useTimes = useTimes; 
    } 
         
             
}

以上实现方法中的aService是推荐使用的方法,bService也是一种防止并发的方法,但由于使用了monitor,其性能不如a方法。

但告诉也能看出纰漏来,aService方法也做不到完全的防并发,在从memcached取出的结果再进行是否为null时,与上面的get方法不能组成原子操作,故aService方法,也可能导致重复多余的访问数据库。