场景是这样的:
需要监听一个任务队列,多个线程往一个集合(考虑到同步问题,使用了java.util.concurrent.ConcurrentLinkedQueue)里面放置任务编号,有另外一个线程去监听这个集合,拿到这些任务编号去做相关的业务。
现在监听这儿有点疑问,目前采用的是死循环不断地poll任务编号,一旦有几次为null,则sleep一定时间,以降低CPU使用率,然后再起来重复取。。
实际应用中,并发量会比较大,但可能偶尔也会出现系统闲置而睡眠的情况吧。
想问下大家,这个监听功能还有什么更高效的办法来实现吗?wait和notify在这种情况下使用靠谱吗?
谢谢!
------解决方案--------------------
不知道具体的需求是什么,至少“死循环不断地poll任务编号”是不可取的,想办法用消息机制取代。
生产消费模型中,可以主动消费,也可以被动消费,看具体情况了。
------解决方案--------------------
使用消息机制比较靠谱
------解决方案--------------------
应该使用 java.util.concurrent.ArrayBlockingQueue/LinkedBlockingQueue来实现
多个线程往一个集合放入东西时只需要queue.put(id);
另外一个线程去监听这个集合只需要
while(true) {
String id = queue.take();
//do something
}
就可以了,这两个类都已经非常好的实现了wait/notify机制,无须你自己处理了
------解决方案--------------------
4楼正解,jdk1.5提供了比较强大的线程功能,blockqueue极大提高了生产、消费模式开发效率。楼主可以试试
------解决方案--------------------
使用生产者/消费者模型,也就是wait和notify实现同步
------解决方案--------------------
mark一下
------解决方案--------------------
新来乍到 顶一下
------解决方案--------------------
实现多线程池,和监听模式完成.那样的话不用死循环去查看, 速度上好了.
------解决方案--------------------
实际生产关键是考虑队列堵塞的问题!
并发不是问题!!一个队列不行,可以用多个队列嘛!!
一台机器不行,可以用多台机器嘛!!
负载均衡,反向代理 解决这种并发问题是小试牛刀!!
重中之重还是队列堵塞的问题!
前面堵住,后面出不来!!或队列中积累的数量线性增长,内存消耗完系统就当机了!!
堵塞带来的联动悲观结果,就是处理器不停的扫描队列,造成资源消耗尽,灯火熄灭!!
同时竞争扫描带来线程死锁!!
总之,恶性循环!!一塌糊涂!!!哈哈哈
因为堵塞不只是简单的PUT,OUT所能解决的,是关系到整个分布式事务的大问题,而且是生产环境无法逾越的,必须要解决的问题!!
谁欺骗它,生产上就会带来恶劣的报复给你!!
这是非常高级的技术问题,预知道详细问题解答,请联系QQ:871933435
------解决方案--------------------
顶一哈,加油~~
------解决方案--------------------
我做J2ME 手机UI开发的时候,用到线程池,就自己写一个来,也就是用到了wait和notify实现同步。
------解决方案--------------------
顶下
------解决方案--------------------
嗯,死循环肯定是需要的,但是需要退出开关。
并且,可以在每次循环开始的时候现取一个编号,没必要生成一大堆编号扔到队列中吧。
------解决方案--------------------
up
------解决方案--------------------
顶一下
------解决方案--------------------
使用生产者/消费者模型,也就是wait和notify实现同步
------解决方案--------------------
UP
------解决方案--------------------
------解决方案--------------------
现在监听这儿有点疑问,目前采用的是死循环不断地poll任务编号,一旦有几次为null,则sleep一定时间,以降低CPU使用率,然后再起来重复取。。
实际应用中,并发量会比较大,但可能偶尔也会出现系统闲置而睡眠的情况吧。
我一般用的是wait这样才是真正的节约cpu,然后用notify或者notifyAll来实现
------解决方案--------------------
- Java code
package chuangsi_0510.thread.timer;//生产者消费者问题public class TestPC { /** * @param args */ public static void main(String[] args) { Queue queue = new Queue(20) ; Producer p1 = new Producer(queue) ; Producer p2 = new Producer(queue) ; Producer p3 = new Producer(queue) ; Consumer c1 = new Consumer(queue) ; Consumer c2 = new Consumer(queue) ; Consumer c3 = new Consumer(queue) ; Consumer c4 = new Consumer(queue) ; Thread tp1 = new Thread(p1) ; Thread tp2 = new Thread(p2) ; Thread tp3 = new Thread(p3) ; Thread tc1 = new Thread(c1) ; Thread tc2 = new Thread(c1) ; Thread tc3 = new Thread(c1) ; Thread tc4 = new Thread(c1) ; tp1.start() ; tp2.start() ; tp3.start() ; tc1.start() ; tc2.start() ; tc3.start() ; tc4.start() ; }}class Item { int id ; String name; public Item(int id , String name ) { this.id = id ; this.name = name ; } }class Queue { public Object[] items ; private int capacity ; private int in ; //下一个可以放商品的位置 private int out ;//下一个可以取商品的位置 private int size ; public Queue(int capacity) { this.capacity = capacity ; items = new Object[capacity] ; size = 0 ; in = out = 0 ; } public synchronized void insert(Object item) { while (size == capacity) { System.out.println("等待消费者把商品消费掉!"); try { this.wait() ; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.notify() ; items[in] = item ; in = (in+1) % capacity ; size++ ; } public synchronized Object remove() { while (size == 0 ) { System.out.println("等待生产者生产产品!"); try { this.wait() ; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } this.notify() ; Object o = items[out] ; out = (out + 1) % capacity ; size-- ; return o ; } }class Producer implements Runnable { private Queue queue ; public Producer(Queue queue) { this.queue = queue ; } public void run() { for (int i = 0 ; i < 30 ; i++ ) { Item item = new Item(i,Thread.currentThread().getName() + "生产的商品" +i) ; queue.insert(item) ; try { Thread.sleep(50) ; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }}class Consumer implements Runnable { private Queue queue ; public Consumer(Queue queue) { this.queue = queue ; } public void run() { for (int i = 0 ; i < 30 ; i++ ) { Item item = (Item)queue.remove() ; try { Thread.sleep( 20) ; } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "消费了" + item.name) ; } } }