最近要为cat增为加一个top key统计,为了避免内存爆掉,希望能实现LRU,但又必须是线程安全的:
google的ConcurrentLinkedHashmap源代码解析
google的ConcurrentLinkedHashmap 源代码解析- Ken-专注后端技术
http://code.google.com/p/concurrentlinkedhashmap/
solr的实现:
package org.apache.solr.common.util;
/*** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/import org.apache.lucene.util.PriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.LinkedHashMap;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.lang.ref.WeakReference;/*** A LRU cache implementation based upon ConcurrentHashMap and other techniques to reduce* contention and synchronization overhead to utilize multiple CPU cores more effectively.* <p/>* Note that the implementation does not follow a true LRU (least-recently-used) eviction* strategy. Instead it strives to remove least recently used items but when the initial* cleanup does not remove enough items to reach the 'acceptableWaterMark' limit, it can* remove more items forcefully regardless of access order.** @version $Id: ConcurrentLRUCache.java 807872 2009-08-26 04:18:22Z hossman $* @since solr 1.4*/
public class ConcurrentLRUCache<K,V> {private static Logger log = LoggerFactory.getLogger(ConcurrentLRUCache.class);private final ConcurrentHashMap<Object, CacheEntry> map;private final int upperWaterMark, lowerWaterMark;private final ReentrantLock markAndSweepLock = new ReentrantLock(true);private boolean isCleaning = false; // not volatile... piggybacked on other volatile varsprivate final boolean newThreadForCleanup;private volatile boolean islive = true;private final Stats stats = new Stats();private final int acceptableWaterMark;private long oldestEntry = 0; // not volatile, only accessed in the cleaning methodprivate final EvictionListener<K,V> evictionListener;private CleanupThread cleanupThread ;public ConcurrentLRUCache(int upperWaterMark, final int lowerWaterMark, int acceptableWatermark,int initialSize, boolean runCleanupThread, boolean runNewThreadForCleanup,EvictionListener<K,V> evictionListener) {if (upperWaterMark < 1) throw new IllegalArgumentException("upperWaterMark must be > 0");if (lowerWaterMark >= upperWaterMark)throw new IllegalArgumentException("lowerWaterMark must be < upperWaterMark");map = new ConcurrentHashMap<Object, CacheEntry>(initialSize);newThreadForCleanup = runNewThreadForCleanup;this.upperWaterMark = upperWaterMark;this.lowerWaterMark = lowerWaterMark;this.acceptableWaterMark = acceptableWatermark;this.evictionListener = evictionListener;if (runCleanupThread) {cleanupThread = new CleanupThread(this);cleanupThread.start();}}public ConcurrentLRUCache(int size, int lowerWatermark) {this(size, lowerWatermark, (int) Math.floor((lowerWatermark + size) / 2),(int) Math.ceil(0.75 * size), false, false, null);}public void setAlive(boolean live) {islive = live;}public V get(K key) {CacheEntry<K,V> e = map.get(key);if (e == null) {if (islive) stats.missCounter.incrementAndGet();return null;}if (islive) e.lastAccessed = stats.accessCounter.incrementAndGet();return e.value;}public V remove(K key) {CacheEntry<K,V> cacheEntry = map.remove(key);if (cacheEntry != null) {stats.size.decrementAndGet();return cacheEntry.value;}return null;}public Object put(K key, V val) {if (val == null) return null;CacheEntry e = new CacheEntry(key, val, stats.accessCounter.incrementAndGet());CacheEntry oldCacheEntry = map.put(key, e);if (oldCacheEntry == null) {stats.size.incrementAndGet();}if (islive) {stats.putCounter.incrementAndGet();} else {stats.nonLivePutCounter.incrementAndGet();}// Check if we need to clear out old entries from the cache.// isCleaning variable is checked instead of markAndSweepLock.isLocked()// for performance because every put invokation will check until// the size is back to an acceptable level.//// There is a race between the check and the call to markAndSweep, but// it's unimportant because markAndSweep actually aquires the lock or returns if it can't.//// Thread safety note: isCleaning read is piggybacked (comes after) other volatile reads// in this method.if (stats.size.get() > upperWaterMark && !isCleaning) {if (newThreadForCleanup) {new Thread() {public void run() {markAndSweep();}}.start();} else if (cleanupThread != null){cleanupThread.wakeThread();} else {markAndSweep();}}return oldCacheEntry == null ? null : oldCacheEntry.value;}/*** Removes items from the cache to bring the size down* to an acceptable value ('acceptableWaterMark').* <p/>* It is done in two stages. In the first stage, least recently used items are evicted.* If, after the first stage, the cache size is still greater than 'acceptableSize'* config parameter, the second stage takes over.* <p/>* The second stage is more intensive and tries to bring down the cache size* to the 'lowerWaterMark' config parameter.*/private void markAndSweep() {// if we want to keep at least 1000 entries, then timestamps of// current through current-1000 are guaranteed not to be the oldest (but that does// not mean there are 1000 entries in that group... it's acutally anywhere between// 1 and 1000).// Also, if we want to remove 500 entries, then// oldestEntry through oldestEntry+500 are guaranteed to be// removed (however many there are there).if (!markAndSweepLock.tryLock()) return;try {long oldestEntry = this.oldestEntry;isCleaning = true;this.oldestEntry = oldestEntry; // volatile write to make isCleaning visiblelong timeCurrent = stats.accessCounter.get();int sz = stats.size.get();int numRemoved = 0;int numKept = 0;long newestEntry = timeCurrent;long newNewestEntry = -1;long newOldestEntry = Integer.MAX_VALUE;int wantToKeep = lowerWaterMark;int wantToRemove = sz - lowerWaterMark;CacheEntry<K,V>[] eset = new CacheEntry[sz];int eSize = 0;// System.out.println("newestEntry="+newestEntry + " oldestEntry="+oldestEntry);// System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved));for (CacheEntry<K,V> ce : map.values()) {// set lastAccessedCopy to avoid more volatile readsce.lastAccessedCopy = ce.lastAccessed;long thisEntry = ce.lastAccessedCopy;// since the wantToKeep group is likely to be bigger than wantToRemove, check it firstif (thisEntry > newestEntry - wantToKeep) {// this entry is guaranteed not to be in the bottom// group, so do nothing.numKept++;newOldestEntry = Math.min(thisEntry, newOldestEntry);} else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group?// this entry is guaranteed to be in the bottom group// so immediately remove it from the map.evictEntry(ce.key);numRemoved++;} else {// This entry *could* be in the bottom group.// Collect these entries to avoid another full pass... this is wasted// effort if enough entries are normally removed in this first pass.// An alternate impl could make a full second pass.if (eSize < eset.length-1) {eset[eSize++] = ce;newNewestEntry = Math.max(thisEntry, newNewestEntry);newOldestEntry = Math.min(thisEntry, newOldestEntry);}}}// System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved));// TODO: allow this to be customized in the constructor?int numPasses=1; // maximum number of linear passes over the data// if we didn't remove enough entries, then make more passes// over the values we collected, with updated min and max values.while (sz - numRemoved > acceptableWaterMark && --numPasses>=0) {oldestEntry = newOldestEntry == Integer.MAX_VALUE ? oldestEntry : newOldestEntry;newOldestEntry = Integer.MAX_VALUE;newestEntry = newNewestEntry;newNewestEntry = -1;wantToKeep = lowerWaterMark - numKept;wantToRemove = sz - lowerWaterMark - numRemoved;// iterate backward to make it easy to remove items.for (int i=eSize-1; i>=0; i--) {CacheEntry<K,V> ce = eset[i];long thisEntry = ce.lastAccessedCopy;if (thisEntry > newestEntry - wantToKeep) {// this entry is guaranteed not to be in the bottom// group, so do nothing but remove it from the eset.numKept++;// remove the entry by moving the last element to it's positioneset[i] = eset[eSize-1];eSize--;newOldestEntry = Math.min(thisEntry, newOldestEntry);} else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group?// this entry is guaranteed to be in the bottom group// so immediately remove it from the map.evictEntry(ce.key);numRemoved++;// remove the entry by moving the last element to it's positioneset[i] = eset[eSize-1];eSize--;} else {// This entry *could* be in the bottom group, so keep it in the eset,// and update the stats.newNewestEntry = Math.max(thisEntry, newNewestEntry);newOldestEntry = Math.min(thisEntry, newOldestEntry);}}// System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " esetSz="+ eSize + " sz-numRemoved=" + (sz-numRemoved));}// if we still didn't remove enough entries, then make another pass while// inserting into a priority queueif (sz - numRemoved > acceptableWaterMark) {oldestEntry = newOldestEntry == Integer.MAX_VALUE ? oldestEntry : newOldestEntry;newOldestEntry = Integer.MAX_VALUE;newestEntry = newNewestEntry;newNewestEntry = -1;wantToKeep = lowerWaterMark - numKept;wantToRemove = sz - lowerWaterMark - numRemoved;PQueue queue = new PQueue(wantToRemove);for (int i=eSize-1; i>=0; i--) {CacheEntry<K,V> ce = eset[i];long thisEntry = ce.lastAccessedCopy;if (thisEntry > newestEntry - wantToKeep) {// this entry is guaranteed not to be in the bottom// group, so do nothing but remove it from the eset.numKept++;// removal not necessary on last pass.// eset[i] = eset[eSize-1];// eSize--;newOldestEntry = Math.min(thisEntry, newOldestEntry);} else if (thisEntry < oldestEntry + wantToRemove) { // entry in bottom group?// this entry is guaranteed to be in the bottom group// so immediately remove it.evictEntry(ce.key);numRemoved++;// removal not necessary on last pass.// eset[i] = eset[eSize-1];// eSize--;} else {// This entry *could* be in the bottom group.// add it to the priority queue// everything in the priority queue will be removed, so keep track of// the lowest value that ever comes back out of the queue.// first reduce the size of the priority queue to account for// the number of items we have already removed while executing// this loop so far.queue.myMaxSize = sz - lowerWaterMark - numRemoved;while (queue.size() > queue.myMaxSize && queue.size() > 0) {CacheEntry otherEntry = (CacheEntry) queue.pop();newOldestEntry = Math.min(otherEntry.lastAccessedCopy, newOldestEntry);}if (queue.myMaxSize <= 0) break;Object o = queue.myInsertWithOverflow(ce);if (o != null) {newOldestEntry = Math.min(((CacheEntry)o).lastAccessedCopy, newOldestEntry);}}}// Now delete everything in the priority queue.// avoid using pop() since order doesn't matter anymorefor (Object o : queue.getValues()) {if (o==null) continue;CacheEntry<K,V> ce = (CacheEntry)o;evictEntry(ce.key);numRemoved++;}// System.out.println("items removed:" + numRemoved + " numKept=" + numKept + " initialQueueSize="+ wantToRemove + " finalQueueSize=" + queue.size() + " sz-numRemoved=" + (sz-numRemoved));}oldestEntry = newOldestEntry == Integer.MAX_VALUE ? oldestEntry : newOldestEntry;this.oldestEntry = oldestEntry;} finally {isCleaning = false; // set before markAndSweep.unlock() for visibilitymarkAndSweepLock.unlock();}}private static class PQueue extends PriorityQueue {int myMaxSize;PQueue(int maxSz) {super.initialize(maxSz);myMaxSize = maxSz;}Object[] getValues() { return heap; }protected boolean lessThan(Object a, Object b) {// reverse the parameter order so that the queue keeps the oldest itemsreturn ((CacheEntry)b).lastAccessedCopy < ((CacheEntry)a).lastAccessedCopy;}// necessary because maxSize is private in base classpublic Object myInsertWithOverflow(Object element) {if (size() < myMaxSize) {put(element);return null;} else if (size() > 0 && !lessThan(element, heap[1])) {Object ret = heap[1];heap[1] = element;adjustTop();return ret;} else {return element;}}}private void evictEntry(K key) {CacheEntry<K,V> o = map.remove(key);if (o == null) return;stats.size.decrementAndGet();stats.evictionCounter++;if(evictionListener != null) evictionListener.evictedEntry(o.key,o.value);}/*** Returns 'n' number of oldest accessed entries present in this cache.** This uses a TreeSet to collect the 'n' oldest items ordered by ascending last access time* and returns a LinkedHashMap containing 'n' or less than 'n' entries.* @param n the number of oldest items needed* @return a LinkedHashMap containing 'n' or less than 'n' entries*/public Map<K, V> getOldestAccessedItems(int n) {markAndSweepLock.lock();Map<K, V> result = new LinkedHashMap<K, V>();TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();try {for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {CacheEntry ce = entry.getValue();ce.lastAccessedCopy = ce.lastAccessed;if (tree.size() < n) {tree.add(ce);} else {if (ce.lastAccessedCopy < tree.first().lastAccessedCopy) {tree.remove(tree.first());tree.add(ce);}}}} finally {markAndSweepLock.unlock();}for (CacheEntry<K, V> e : tree) {result.put(e.key, e.value);}return result;}public Map<K,V> getLatestAccessedItems(int n) {// we need to grab the lock since we are changing lastAccessedCopymarkAndSweepLock.lock();Map<K,V> result = new LinkedHashMap<K,V>();TreeSet<CacheEntry> tree = new TreeSet<CacheEntry>();try {for (Map.Entry<Object, CacheEntry> entry : map.entrySet()) {CacheEntry ce = entry.getValue();ce.lastAccessedCopy = ce.lastAccessed;if (tree.size() < n) {tree.add(ce);} else {if (ce.lastAccessedCopy > tree.last().lastAccessedCopy) {tree.remove(tree.last());tree.add(ce);}}}} finally {markAndSweepLock.unlock();}for (CacheEntry<K,V> e : tree) {result.put(e.key, e.value);}return result;}public int size() {return stats.size.get();}public void clear() {map.clear();}public Map<Object, CacheEntry> getMap() {return map;}private static class CacheEntry<K,V> implements Comparable<CacheEntry> {K key;V value;volatile long lastAccessed = 0;long lastAccessedCopy = 0;public CacheEntry(K key, V value, long lastAccessed) {this.key = key;this.value = value;this.lastAccessed = lastAccessed;}public void setLastAccessed(long lastAccessed) {this.lastAccessed = lastAccessed;}public int compareTo(CacheEntry that) {if (this.lastAccessedCopy == that.lastAccessedCopy) return 0;return this.lastAccessedCopy < that.lastAccessedCopy ? 1 : -1;}public int hashCode() {return value.hashCode();}public boolean equals(Object obj) {return value.equals(obj);}public String toString() {return "key: " + key + " value: " + value + " lastAccessed:" + lastAccessed;}}private boolean isDestroyed = false;public void destroy() {try {if(cleanupThread != null){cleanupThread.stopThread();}} finally {isDestroyed = true;}}public Stats getStats() {return stats;}public static class Stats {private final AtomicLong accessCounter = new AtomicLong(0),putCounter = new AtomicLong(0),nonLivePutCounter = new AtomicLong(0),missCounter = new AtomicLong();private final AtomicInteger size = new AtomicInteger();private long evictionCounter = 0;public long getCumulativeLookups() {return (accessCounter.get() - putCounter.get() - nonLivePutCounter.get()) + missCounter.get();}public long getCumulativeHits() {return accessCounter.get() - putCounter.get() - nonLivePutCounter.get();}public long getCumulativePuts() {return putCounter.get();}public long getCumulativeEvictions() {return evictionCounter;}public int getCurrentSize() {return size.get();}public long getCumulativeNonLivePuts() {return nonLivePutCounter.get();}public long getCumulativeMisses() {return missCounter.get();}}public static interface EvictionListener<K,V>{public void evictedEntry(K key, V value);}private static class CleanupThread extends Thread {private WeakReference<ConcurrentLRUCache> cache;private boolean stop = false;public CleanupThread(ConcurrentLRUCache c) {cache = new WeakReference<ConcurrentLRUCache>(c);}public void run() {while (true) {synchronized (this) {if (stop) break;try {this.wait();} catch (InterruptedException e) {}}if (stop) break;ConcurrentLRUCache c = cache.get();if(c == null) break;c.markAndSweep();}}void wakeThread() {synchronized(this){this.notify();}}void stopThread() {synchronized(this){stop=true;this.notify();}}}protected void finalize() throws Throwable {try {if(!isDestroyed){log.error("ConcurrentLRUCache was not destroyed prior to finalize(), indicates a bug -- POSSIBLE RESOURCE LEAK!!!");destroy();}} finally {super.finalize();}}
}