当前位置: 代码迷 >> 综合 >> disruptor 入门笔记
  详细解决方案

disruptor 入门笔记

热度:80   发布时间:2023-12-21 16:08:47.0

disruptor入门例子:

package com.taobao.eagleeye.rt.storm;import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.taobao.eagleeye.digest.handler.DigestJob;
import com.taobao.eagleeye.digest.handler.DigesterWorkerHanlder;
import com.taobao.eagleeye.disruptor.EventHandler;
import com.taobao.eagleeye.disruptor.RingBuffer;
import com.taobao.eagleeye.disruptor.dsl.Disruptor;
import com.taobao.tlog.util.NamedThreadFactory;public class Test {private static final int RING_BUFFER_SIZE = 8;public static void main(String[] args) {ExecutorService es = new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new NamedThreadFactory("disruptor-process"));Disruptor<DigestJob> disruptor = new Disruptor<DigestJob>(DigestJob.EVENT_FACTORY, RING_BUFFER_SIZE, es);DigesterWorkerHanlder[] workers = new DigesterWorkerHanlder[12];disruptor.handleEventsWith(new EventHandler<DigestJob>() {@Overridepublic void onEvent(DigestJob event, long sequence, boolean endOfBatch) throws Exception {System.out.println("event=" + event);Thread.sleep(10000);}});RingBuffer<DigestJob> ringBuffer = disruptor.start();while (true) {long seq = ringBuffer.next();DigestJob job = ringBuffer.get(seq);job.context = null;ringBuffer.publish(seq);System.out.println("emit..."+seq);try {Thread.sleep(100);} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}

 

package com.taobao.tlog.runtime;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.taobao.tlog.disruptor.EventFactory;
import com.taobao.tlog.disruptor.EventHandler;
import com.taobao.tlog.disruptor.RingBuffer;
import com.taobao.tlog.disruptor.WorkHandler;
import com.taobao.tlog.disruptor.dsl.Disruptor;public class ActorTest {static class ActorHandler implementsWorkHandler<String>,EventHandler<String> {@Overridepublic void onEvent(String event, long sequence, boolean endOfBatch) throws Exception {onEvent(event);}@Overridepublic void onEvent(String event) throws Exception {System.out.println(event);throw new RuntimeException("fuck");}}public static void main(String[] args) {ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Action" + "-thread-%d").build();EventFactory<String> EVENT_FACTORY = new EventFactory<String>() {@Overridepublic String newInstance() {return new String();}};ExecutorService es = Executors.newCachedThreadPool(namedThreadFactory);Disruptor<String> disruptor = new Disruptor<String>(EVENT_FACTORY, 8, es);disruptor.handleEventsWith(new ActorHandler());RingBuffer<String> ringBuffer = disruptor.start();long seq = ringBuffer.next();String k = ringBuffer.get(seq);k = "test";ringBuffer.publish(seq);}
}