schedulers详解

发布时间:2023-05-20

schedulers是RxJava2中非常重要的概念,也是其最为核心的部分之一。schedulers能够使RxJava2的执行更加高效,可控,能够有效地防止RxJava2中常见的一些问题,如线程阻塞、卡顿等。本文将从多个方面对schedulers进行详细的阐述。

一、schedulers介绍

schedulers是RxJava2中用来控制Observable数据源的线程的调度程序,能够有效地控制Observable的运行、订阅和发射。RxJava2中提供了许多的schedulers,如Schedulers.io()Schedulers.newThread()Schedulers.computation()AndroidSchedulers.mainThread()等。 为了便于理解schedulers的作用,我们来看一下下面这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
    }
});

在这个例子中,我们在Observable中发射了数字1并在Observer中接收这个数字。我们打印出了Observable和Observer所在的线程名。如果我们不使用schedulers,我们会发现Observable和Observer都在同一个线程中,即主线程中。这样的话,就可能会出现Observable中执行耗时操作导致主线程卡顿或者ANR的问题。 那么,我们可以使用schedulers来改变Observable和Observer所在的线程。例如,我们可以使用Schedulers.newThread()将Observable切换到一个新的线程中:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
    }
});

这样,Observable就会切换到一个新的线程中执行,而Observer仍然在主线程中执行。

二、schedulers的类型

RxJava2中提供了多种schedulers类型,每种类型都有不同的用途:

1. Schedulers.io()

Schedulers.io()是一个异步任务线程池,用于执行I/O操作或延迟任务。它比Schedulers.newThread()更有效,因为在同一时间可以有多个I/O操作运行在Schedulers.io()线程池中,而Schedulers.newThread()始终只能运行一个线程。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
    }
});

2. Schedulers.newThread()

Schedulers.newThread()会创建一个新的线程执行任务。如果你没有为任务创建专门的线程池,它就会为你创建线程。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.newThread()).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
    }
});

3. Schedulers.computation()

Schedulers.computation()会创建一个固定大小的线程池,用于执行CPU密集型操作,比如数学运算。它的线程池大小默认为CPU内核数量,但你也可以通过调用Schedulers.computation(int n)来使用不同大小的线程池。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
        emitter.onNext(1);
    }
}).subscribeOn(Schedulers.computation()).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
    }
});

4. AndroidSchedulers.mainThread()

AndroidSchedulers.mainThread()是在Android中使用的schedulers类型,它会将任务切换到Android主线程中运行。一般用于将结果通知给UI组件。

Observable.just(1, 2, 3, 4)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                textView.setText(String.valueOf(integer));
            }
        });

三、链式调用和指定线程

在RxJava2中,我们可以使用链式调用来指定Observable和Observer的线程:

Observable.just(1, 2, 3, 4)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                textView.setText(String.valueOf(integer));
            }
        });

在这个例子中,我们先使用just创建了一个包含数字1-4的Observable,然后使用subscribeOn指定Observable的线程为io线程。接着使用observeOn将Observer的线程切换到Android主线程中。最后,在subscribe中指定接收Observable发射出的整数,并将其设置为textView的内容。

四、指定多个线程

有时候,我们需要在不同的地方指定不同的线程,这时我们可以使用RxJava2中的更高级的方法。例如,我们可以使用flatMap来指定多个不同的线程:

Observable.just(1, 2, 3)
        .flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.just(String.valueOf(integer))
                        .subscribeOn(Schedulers.newThread())
                        .map(new Function<String, String>() {
                            @Override
                            public String apply(String s) throws Exception {
                                Log.d(TAG, "map apply thread is : " + Thread.currentThread().getName());
                                return s + " mapped to " + Thread.currentThread().getName();
                            }
                        })
                        .observeOn(AndroidSchedulers.mainThread());
            }
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
                Log.d(TAG, "accept: " + s);
            }
        });

在这个例子中,我们使用flatMap将Observable中的每个整数转换为一个新的Observable。在新的Observable中,我们先使用subscribeOn将线程切换到新的线程中,然后使用map进行数据转换并指定线程,最后使用observeOn将线程切换回主线程中。在subscribe中我们指定接收字符串数据并打印出来。

总结

本文从介绍schedulers,schedulers的类型,链式调用和指定线程以及指定多个线程4个方面对schedulers进行了详细的阐述。如果在RxJava2的开发中没有使用schedulers,就意味着你失去了很多优化和控制的机会。通过本文,希望大家可以更好地理解和使用schedulers。