schedulers是RxJava2中非常重要的概念,也是其最为核心的部分之一。schedulers能够使RxJava2的执行更加高效,可控,能够有效地防止RxJava2中常见的一些问题,如线程阻塞、卡顿等。本文将从多个方面对schedulers进行详细的阐述。
一、schedulers介绍
schedulers是RxJava2中用来控制Observable数据源的线程的调度程序,能够有效地控制Observable的运行、订阅和发射。RxJava2中提供了许多的schedulers,如Schedulers.io()
,Schedulers.newThread()
,Schedulers.computation()
和AndroidSchedulers.mainThread()
等。
为了便于理解schedulers的作用,我们来看一下下面这个例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
在这个例子中,我们在Observable中发射了数字1并在Observer中接收这个数字。我们打印出了Observable和Observer所在的线程名。如果我们不使用schedulers,我们会发现Observable和Observer都在同一个线程中,即主线程中。这样的话,就可能会出现Observable中执行耗时操作导致主线程卡顿或者ANR的问题。
那么,我们可以使用schedulers来改变Observable和Observer所在的线程。例如,我们可以使用Schedulers.newThread()
将Observable切换到一个新的线程中:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
这样,Observable就会切换到一个新的线程中执行,而Observer仍然在主线程中执行。
二、schedulers的类型
RxJava2中提供了多种schedulers类型,每种类型都有不同的用途:
1. Schedulers.io()
Schedulers.io()
是一个异步任务线程池,用于执行I/O操作或延迟任务。它比Schedulers.newThread()
更有效,因为在同一时间可以有多个I/O操作运行在Schedulers.io()
线程池中,而Schedulers.newThread()
始终只能运行一个线程。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
2. Schedulers.newThread()
Schedulers.newThread()
会创建一个新的线程执行任务。如果你没有为任务创建专门的线程池,它就会为你创建线程。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
3. Schedulers.computation()
Schedulers.computation()
会创建一个固定大小的线程池,用于执行CPU密集型操作,比如数学运算。它的线程池大小默认为CPU内核数量,但你也可以通过调用Schedulers.computation(int n)
来使用不同大小的线程池。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
emitter.onNext(1);
}
}).subscribeOn(Schedulers.computation()).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
}
});
4. AndroidSchedulers.mainThread()
AndroidSchedulers.mainThread()
是在Android中使用的schedulers类型,它会将任务切换到Android主线程中运行。一般用于将结果通知给UI组件。
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
textView.setText(String.valueOf(integer));
}
});
三、链式调用和指定线程
在RxJava2中,我们可以使用链式调用来指定Observable和Observer的线程:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
textView.setText(String.valueOf(integer));
}
});
在这个例子中,我们先使用just
创建了一个包含数字1-4的Observable,然后使用subscribeOn
指定Observable的线程为io线程。接着使用observeOn
将Observer的线程切换到Android主线程中。最后,在subscribe
中指定接收Observable发射出的整数,并将其设置为textView的内容。
四、指定多个线程
有时候,我们需要在不同的地方指定不同的线程,这时我们可以使用RxJava2中的更高级的方法。例如,我们可以使用flatMap
来指定多个不同的线程:
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(String.valueOf(integer))
.subscribeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(TAG, "map apply thread is : " + Thread.currentThread().getName());
return s + " mapped to " + Thread.currentThread().getName();
}
})
.observeOn(AndroidSchedulers.mainThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
Log.d(TAG, "accept: " + s);
}
});
在这个例子中,我们使用flatMap
将Observable中的每个整数转换为一个新的Observable。在新的Observable中,我们先使用subscribeOn
将线程切换到新的线程中,然后使用map
进行数据转换并指定线程,最后使用observeOn
将线程切换回主线程中。在subscribe
中我们指定接收字符串数据并打印出来。
总结
本文从介绍schedulers,schedulers的类型,链式调用和指定线程以及指定多个线程4个方面对schedulers进行了详细的阐述。如果在RxJava2的开发中没有使用schedulers,就意味着你失去了很多优化和控制的机会。通过本文,希望大家可以更好地理解和使用schedulers。