RxAndroid Observable работает в неожиданном потоке

Я пытаюсь создать Observable таким образом, чтобы он загружал некоторые данные из сети с интервалом и всякий раз, когда пользователь обновляет страницу. Это суть того, что у меня есть до сих пор:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.flatMap(t -> {
    // network operations that eventually return a value
    // these operations are not observables themselves
    // they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // update ui with data
}, error -> {
    // do something with error
});

Позже в обратном вызове обновления у меня есть:

refreshSubject.onNext(0L);

Он работает с интервалом нормально, однако, когда я обновляюсь, он взрывается с NetworkOnMainThreadException. Я думал, что справился с этим с помощью subscribeOn/observeOn. Что мне не хватает? Кроме того, почему это не вызывает сбой, когда Observer запускается из интервала?


person mrobinson7627    schedule 09.09.2016    source источник


Ответы (1)


Вы должны изменить свой subscribeOn(Schedulers.io()) на observeOn(Schedulers.io()) и переместить его на карту flatMap. Причина этого в том, что ваш refreshSubject является PublishSubject, который является Observable и Observer.

Поскольку onNext() этого PublishSubject вызывается внутри внутреннего Observable, прежде чем результат будет доставлен в вашу подписку. Это также причина того, что это работает, когда вы просто используете свой Observable (и тот факт, что interval всегда подписывается на поток вычислений по умолчанию).

Просто проверьте вывод этих двух фрагментов:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
                // do something with error
            });

vs

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
    // do something with error
});
person Stephan    schedule 09.09.2016
comment
Ваше предложение сработало, но меня немного смущает это утверждение: поскольку onNext() этого PublishSubject вызывается внутри стажера Observable, прежде чем результат будет доставлен в вашу подписку. Что такое стажер Наблюдаемый? - person mrobinson7627; 09.09.2016