Penanganan Tekanan Balik dan Konkurensi yang tepat dengan RxJava2

Saya memiliki layanan yang telah saya daftarkan panggilan baliknya, dan sekarang saya ingin mengeksposnya sebagai Flowable dengan persyaratan/batasan tertentu:

  1. Callback penerima thread tidak boleh diblokir (pekerjaan harus diserahkan ke thread/penjadwal berbeda yang ditentukan oleh pengamat)
  2. Seharusnya tidak ada pengecualian karena konsumen memperlambat arus
  3. Beberapa konsumen dapat berlangganan secara independen satu sama lain
  4. konsumen dapat memilih untuk melakukan buffering pada semua item sehingga tidak ada satu pun yang hilang, namun item tersebut tidak boleh di-buffer di kelas 'produsen'

Di bawah ini adalah apa yang saya miliki saat ini

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

Saya tidak yakin apakah ini benar-benar sesuai dengan kebutuhan saya. Ada catatan di onBackpressureLatest javadoc tentang observeOn yang saya tidak mengerti:

Perhatikan bahwa karena sifat bagaimana permintaan tekanan balik disebarkan melalui subscribeOn/observeOn, meminta lebih dari 1 dari downstream tidak menjamin pengiriman kejadian onNext secara berkelanjutan

Dan saya punya pertanyaan lain:

  • Apakah panggilan onBackpressureLatest membuat item tidak lagi multicast?
  • Bagaimana cara menguji persyaratan saya?
  • Bonus: Jika saya memiliki beberapa penerbit seperti itu (di kelas yang sama atau di tempat lain), apa cara terbaik untuk membuat pola yang sama dapat digunakan kembali. Buat Flowable saya sendiri dengan metode delegasi/ekstra?

person Aarjav    schedule 22.05.2018    source sumber


Jawaban (1)


Saya tidak yakin apakah ini benar-benar sesuai dengan kebutuhan saya.

Itu tidak. Terapkan onBackpressureLatest atau onBackpressureBuffer diikuti oleh observeOn di observeSomePacketsOn dan observeAllPacketsOn masing-masing.

Apakah panggilan onBackpressureLatest membuat item tidak lagi multicast?

Multicasting dilakukan oleh PublishProcessor dan pelanggan yang berbeda akan membuat saluran secara independen di mana operator onBackpressureXXX dan observeOn berlaku pada basis pelanggan individual.

Bagaimana cara menguji persyaratan saya?

Berlangganan melalui Flowable lossy atau lossless dengan TestSubscriber (Flowable.test()), masukkan kumpulan Paket yang diketahui ke packets dan lihat apakah semuanya tiba melalui TestSubscriber.assertValueCount() atau TestSubscriber.values(). Yang lossy harusnya 1 .. N dan yang lossless harus memiliki nilai N setelah masa tenggang.

Bonus: Jika saya memiliki beberapa penerbit seperti itu (di kelas yang sama atau di tempat lain), apa cara terbaik untuk membuat pola yang sama dapat digunakan kembali. Buat Flowable saya sendiri dengan metode delegasi/ekstra?

Anda dapat mengubah observeAllPacketsOn menjadi FlowableTransformer dan sebagai ganti pemanggilan metode di MyBroadcaster, gunakan compose, misalnya:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
person akarnokd    schedule 23.05.2018
comment
Apa perbedaan perilaku antara memanggil onBackPressureXXX dalam suatu metode (untuk setiap pelanggan) vs sekali dan membagikannya ke semua? apakah mereka ikut campur? Terima kasih atas tanggapan Anda, apakah pertanyaan semacam ini lebih baik ditanyakan pada masalah rxjava github? - person Aarjav; 24.05.2018