线程调度文章:https://blog.csdn.net/rnZuoZuo/article/details/83830391
研究结论:默认情况下,不指定生产者线程和消费者线程,运行在当前线程,
但是如果emitter在发射的时候放在一个新的线程里,那么加工线程和消费线程保持和这个新的线程一致
测试代码:
public class MainDemo {public static void main(String[] args){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {emitter.onNext(1);emitter.onComplete();}}).start();System.out.println( "rx_call_1:" + Thread.currentThread().getName() );}}). map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer cmsResponse) throws Exception {System.out.println( "rx_call_map:" + Thread.currentThread().getName() );return 2;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {System.out.println( "rx_call_2:" + Thread.currentThread().getName() );}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});}
}
下面虽然指定里生产者线程 subscribeOn(Schedulers.io()),但是加工线程和消费线程还是跟随emiter.onnext执行的线程
public class MainDemo {public static void main(String[] args){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {new Thread(new Runnable() {@Overridepublic void run() {emitter.onNext(1);emitter.onComplete();}}).start();System.out.println( "rx_call_1:" + Thread.currentThread().getName() );}}). map(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer cmsResponse) throws Exception {System.out.println( "rx_call_map:" + Thread.currentThread().getName() );return 2;}}) .subscribeOn(Schedulers.io()).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {System.out.println( "rx_call_2:" + Thread.currentThread().getName() );}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}});} }