RxAndroid Observable ทำงานบนเธรดที่ไม่คาดคิด

ฉันกำลังพยายามสร้าง Observable เพื่อให้โหลดข้อมูลบางส่วนจากเครือข่ายตามช่วงเวลา และเมื่อใดก็ตามที่ผู้ใช้รีเฟรชหน้าเว็บ นี่คือส่วนสำคัญของสิ่งที่ฉันมีจนถึงตอนนี้:

PublishSubject<Long> refreshSubject = PublishSubject.create();
Observable<MyDataType> observable = Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.flatMap(t -> {
    // network operations that eventually return a value
    // these operations are not observables themselves
    // they are fully blocking network operations
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    // update ui with data
}, error -> {
    // do something with error
});

ต่อมาในการรีเฟรชการโทรกลับฉันมี:

refreshSubject.onNext(0L);

มันทำงานตามช่วงเวลาปกติ แต่เมื่อฉันรีเฟรช มันจะระเบิดด้วย NetworkOnMainThreadException ฉันคิดว่าฉันจัดการเรื่องนี้ด้วย subscribeOn/observeOn ฉันพลาดอะไรไป? นอกจากนี้ เหตุใดสิ่งนี้จึงไม่ทำให้เกิดข้อขัดข้องเมื่อ Observer ถูกทริกเกอร์จากช่วงเวลา


person mrobinson7627    schedule 09.09.2016    source แหล่งที่มา


คำตอบ (1)


คุณต้องเปลี่ยน subscribeOn(Schedulers.io()) เป็น observeOn(Schedulers.io()) และย้ายไปไว้เหนือ flatMap ของคุณ เหตุผลก็คือ refreshSubject ของคุณเป็น PublishSubject ซึ่งเป็น Observable และ Observer

เนื่องจาก onNext() ของ PublishSubject นี้ถูกเรียกภายใน Observable ฝึกงานก่อนที่ผลลัพธ์จะถูกส่งไปยังการสมัครของคุณ นี่คือเหตุผลที่มันใช้งานได้เมื่อคุณใช้ Observable ของคุณ (และความจริงที่ว่า interval สมัครรับเธรดการคำนวณตามค่าเริ่มต้นเสมอ)

เพียงตรวจสอบผลลัพธ์ของตัวอย่างทั้งสองนี้:

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.observeOn(Schedulers.io())
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
                // do something with error
            });

vs

Observable.merge(
    Observable.interval(0, 3, TimeUnit.SECONDS),
    refreshSubject
)
.doOnNext(aLong -> Log.d("Thread", Thread.currentThread().toString()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {
    Log.d("Subscribe Thread", Thread.currentThread().toString());
}, error -> {
    // do something with error
});
person Stephan    schedule 09.09.2016
comment
ข้อเสนอแนะของคุณได้ผล แต่ฉันสับสนเล็กน้อยกับข้อความนี้: เนื่องจาก onNext() ของ PublishSubject นี้ถูกเรียกภายใน Observable ฝึกงานก่อนที่ผลลัพธ์จะถูกส่งไปยังการสมัครของคุณ Observable คืออะไร? - person mrobinson7627; 09.09.2016