RxAndroid Observable berjalan di thread yang tidak terduga

Saya mencoba membuat Observable sedemikian rupa sehingga akan memuat beberapa data dari jaringan pada suatu interval, dan setiap kali pengguna menyegarkan halaman. Inilah inti dari apa yang saya miliki sejauh ini:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.flatMap(t -> {
    // network operations that eventually return a value
    // these operations are not observables themselves
    // they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // update ui with data
}, error -> {
    // do something with error
});

Kemudian dalam panggilan balik penyegaran saya punya:

refreshSubject.onNext(0L);

Ini berjalan pada interval dengan baik, namun ketika saya menyegarkan, itu meledak dengan NetworkOnMainThreadException. Saya pikir saya menangani ini dengan subscribeOn/observeOn. Apa yang saya lewatkan? Juga, mengapa hal ini tidak menyebabkan kerusakan ketika Observer dipicu dari interval?


person mrobinson7627    schedule 09.09.2016    source sumber


Jawaban (1)


Anda harus mengubah subscribeOn(Schedulers.io()) menjadi observeOn(Schedulers.io()) dan memindahkannya ke flatMap Anda. Alasannya adalah refreshSubject Anda adalah PublishSubject, yang merupakan Observable dan Observer.

Karena onNext() dari PublishSubject ini dipanggil di dalam Intern Observable terlebih dahulu sebelum hasilnya dikirimkan ke langganan Anda. Ini juga alasan mengapa ini berfungsi ketika Anda hanya menggunakan Observable Anda (dan fakta bahwa interval selalu berlangganan thread komputasi secara default).

Periksa saja keluaran dari dua cuplikan tersebut:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
                // do something with error
            });

vs

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
    // do something with error
});
person Stephan    schedule 09.09.2016
comment
Saran Anda berhasil, tapi saya sedikit bingung dengan pernyataan ini: Karena onNext() dari PublishSubject ini dipanggil di dalam Intern Observable terlebih dahulu sebelum hasilnya dikirimkan ke langganan Anda. Apa yang dimaksud dengan magang yang Dapat Diamati? - person mrobinson7627; 09.09.2016