Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。
注:什么叫线程安全?这个首先要明确。线程安全的类 ,指的是类内共享的全局变量的访问必须保证是不受多线程形式影响的。如果由于多线程的访问(比如修改、遍历、查看)而使这些变量结构被破坏或者针对这些变量操作的原子性被破坏,则这个类就不是线程安全的。
一、BlockingQueue接口的具体实现类:
1. ArrayBlockingQueue,其构造函数必须带一个int参数来指明其大小
2. LinkedBlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定
3. PriorityBlockingQueue,其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序
BlockingQueue提供的常用方法:
可能报异常 |
返回布尔值 |
可能阻塞 |
设定等待时间 |
|||||
入队 |
add(e) |
offer(e) |
put(e) |
offer(e, timeout, unit) |
||||
出队 |
remove() |
poll() |
take() |
poll(timeout, unit) |
||||
查看 |
element() |
peek() |
无 |
无 |
||||
以ArrayBlockingQueue为例的解析:
1. add(e)remove() element() 方法不会阻塞线程。当不满足约束条件时,会抛出IllegalStateException异常。例如:当队列被元素填满后,再调用add(e),则会抛出异常。
public boolean add(Ee) {
if (offer(e))return true;
elsethrow new IllegalStateException("Queuefull");//队列已满,抛异常
}
public Eremove() {E x = poll();if (x != null)return x;elsethrow new NoSuchElementException();//队列为空,抛异常
}
2.
offer(e)poll() peek()
方法即不会阻塞线程,也不会抛出异常。例如:当队列被元素填满后,再调用
offer(e)
,则不会插入元素,函数返回
false
。
public boolean offer(Ee) {
if (e== null)throw new NullPointerException();final ReentrantLock lock= this.lock;lock.lock();try {if (count == items.length)//队列已满,返回falsereturn false;else {insert(e);//insert方法中发出了notEmpty.signal();return true;}} finally {lock.unlock();}
}
public Epoll() {final ReentrantLock lock= this.lock;lock.lock();try {if (count == 0)//队列为空,返回falsereturn null;E x = extract();//extract方法中发出了notFull.signal();return x;} finally {lock.unlock();}
}
3. 要想要实现阻塞功能,需要调用put(e) take() 方法
。当不满足约束条件时,会阻塞线程。
public void put(Ee)throws InterruptedException {if (e== null)throw new NullPointerException();final E[] items = this.items;final ReentrantLock lock= this.lock;lock.lockInterruptibly();try {try {while (count == items.length)//如果队列已满,等待notFull这个条件,这时当前线程被阻塞notFull.await();} catch (InterruptedException ie){notFull.signal(); //唤醒受notFull阻塞的当前线程throw ie;}insert(e);} finally {lock.unlock();}
}
public Etake() throws InterruptedException {final ReentrantLock lock= this.lock;lock.lockInterruptibly();try {try {while (count == 0)//如果队列为空,等待notEmpty这个条件,这时当前线程被阻塞notEmpty.await();} catch (InterruptedException ie){notEmpty.signal(); //唤醒受notEmpty阻塞的当前线程throw ie;}E x = extract();return x;} finally {lock.unlock();}
}
二、ConcurrentLinkedQueue解剖:
背景:对比锁机制的实现,使用无锁机制的难点在于要充分考虑线程间的协调。简单的说就是多个线程对内部数据结构进行访问时,如果其中一个线程执行的中途因为一些原因出现故障,其他的线程能够检测并帮助完成剩下的操作。这就需要把对数据结构的操作过程精细的划分成多个状态或阶段,考虑每个阶段或状态多线程访问会出现的情况。
ConcurrentLinkedQueue 有两个volatile的线程共享变量:head,tail。要保证这个队列的线程安全就是保证对这两个Node的引用的访问(更新,查看)的原子性和可见性,由于volatile本身能够保证可见性,所以就是对其修改的原子性要被保证。下面通过offer方法的实现来看看在无锁情况下如何保证原子性
public boolean offer(E e) {if (e == null)throw new NullPointerException();Node<E> n = new Node<E>(e, null);for (;;) {Node<E> t = tail;Node<E> s = t.getNext();if (t == tail) { if (s == null) {if (t.casNext(s, n)) { casTail(t, n); return true;}} else {casTail(t, s); }}}
}
使用ConcurrentLinkedQueue时要注意,如果直接使用它提供的函数,比如add或者poll方法,这样我们自己不需要做任何同步
但如果是非原子操作,比如:
if(!queue.isEmpty()){
queue.poll(obj);
}
我们很难保证,在调用了isEmpty()之后,poll()之前,这个queue没有被其他线程修改。所以对于这种情况,我们还是需要自己同步:
synchronized(queue) {if(!queue.isEmpty()) {queue.poll(obj);}
}
注:这种需要进行自己同步的情况要视情况而定,不是任何情况下都需要这样做。
另外还说一下,ConcurrentLinkedQueue的size()是要遍历一遍集合的,所以尽量要避免用size而改用isEmpty(),以免性能过慢。
1 非阻塞的计数器 -使用同步的线程安全的计数器
public finalclass Counter {private long value =0;public synchronizedlong getValue() {return value;}public synchronizedlong increment() {return ++value;}
}
2 使用 AtomicInteger的compareAndSet()(CAS方法)的非阻塞计数器
public class NonblockingCounter{private AtomicIntegervalue;//前面提到过,AtomicInteger类是以原子的方式操作整型变量。public int getValue(){return value.get();}public int increment(){int v;do {v = value.get();while (!value.compareAndSet(v, v +1));return v + 1;}
}
非阻塞版本相对于基于锁的版本有几个性能优势。首先,它用硬件的原生形态代替 JVM的锁定代码路径,从而在更细的粒度层次上(独立的内存位置)进行同步,失败的线程也可以立即重试,而不会被挂起后重新调度。更细的粒度降低了争用的机会,不用重新调度就能重试的能力也降低了争用的成本。即使有少量失败的 CAS操作,这种方法仍然会比由于锁争用造成的重新调度快得多。
NonblockingCounter 这个示例可能简单了些,但是它演示了所有非阻塞算法的一个基本特征——有些算法步骤的执行是要冒险的,因为知道如果 CAS不成功可能不得不重做。非阻塞算法通常叫作乐观算法,因为它们继续操作的假设是不会有干扰。如果发现干扰,就会回退并重试。在计数器的示例中,冒险的步骤是递增——它检索旧值并在旧值上加一,希望在计算更新期间值不会变化。如果它的希望落空,就会再次检索值,并重做递增计算
3 Michael-Scott 非阻塞队列算法- ConcurrentLinkedQueue的算法实现
public class LinkedQueue <E> {private staticclass Node <E> {final E item;final AtomicReference<Node<E>> next;Node(E item, Node<E> next) {this.item = item;this.next = new AtomicReference<Node<E>>(next);}
}private AtomicReference<Node<E>> head= new AtomicReference<Node<E>>(new Node<E>(null,null));private AtomicReference<Node<E>> tail = head;public boolean put(E item) {Node<E> newNode = new Node<E>(item,null);while (true) {Node<E> curTail = tail.get();Node<E> residue = curTail.next.get();if (curTail == tail.get()) {if (residue == null){if (curTail.next.compareAndSet(null, newNode)){tail.compareAndSet(curTail, newNode) ;return true;}}else{tail.compareAndSet(curTail, residue) ;}}}}
}
参考文档:
http://blog.csdn.net/madun/article/details/20313269
http://www.ibm.com/developerworks/cn/java/j-jtp04186/