问题描述
RxJava 中是否有与 RxAndroid 中的 AndroidSchedulers.mainThread() 同义的 Scheduler api。 所以如果我在一个新线程上安排一个任务,我想在 Java 主线程上观察它,我该怎么做?
编辑下面是一个示例 RxSubscription,在 system.in 注释下,主线程被杀死,而 Observable.interval 在单独的线程上运行。 在 Android 中,我可以说 observeOn(AndroidSchedulers.MainThread) 和此后的任何操作都将在主线程上运行。 我正在寻找一个类似的 Java 调度程序,因为 AndroidSchedulers 是 RxAndroid 的一部分。
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import rx.Observable;
public class Main {
public static void main(String[] args) throws InterruptedException, IOException {
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
//System.in.read();
}
}
1楼
回到“主”Java 线程目前是不可能的,因为 RxJava 1.x 没有阻塞调度程序。
如果您可以升级到 RxJava 2.x,我有一个可以“固定”到当前线程的特殊调度程序:
compile "com.github.akarnokd:rxjava2-extensions:0.15.1"
阻塞调度器
这种类型的调度程序在“当前线程”上运行其执行循环,更具体地说,是调用其 execute() 方法的线程。 该方法会阻塞,直到调用 shutdown()。 这种类型的调度程序允许从其他线程返回到“主”线程。
public static void main(String[] args) {
BlockingScheduler scheduler = new BlockingScheduler();
scheduler.execute(() -> {
Flowable.range(1, 10)
.subscribeOn(Schedulers.io())
.observeOn(scheduler)
.doAfterTerminate(() -> scheduler.shutdown())
.subscribe(v -> System.out.println(v + " on " + Thread.currentThread()));
});
System.out.println("BlockingScheduler finished");
}
2楼
是的,RxJava 有调度器。 要将消息发送到任何线程,您需要有某种消息循环等待来自其他线程的消息。 在 Android 中,这是您的 Looper。 在 Java 中,您需要自己执行此操作。 然后,您的调度程序会向该线程发送一条消息,并在该消息响应中完成工作。 其机制取决于您如何实现消息队列,但应该相当简单。