การจัดการ Backpressure และ Concurrency อย่างเหมาะสมกับ RxJava2

ฉันมีบริการที่ฉันได้ลงทะเบียนการโทรกลับ และตอนนี้ฉันต้องการแสดงเป็น Flowable โดยมีข้อกำหนด/ข้อจำกัดบางประการ:

  1. ไม่ควรบล็อกการโทรกลับเพื่อรับเธรด (งานควรถูกส่งต่อไปยังเธรด/ตัวกำหนดเวลาอื่นที่ผู้สังเกตการณ์ระบุ)
  2. ไม่ควรให้มีข้อยกเว้นใดๆ เนื่องจากผู้บริโภคมีการชะลอตัว
  3. ผู้บริโภคหลายรายสามารถสมัครสมาชิกได้โดยแยกจากกัน
  4. ผู้บริโภคสามารถเลือกบัฟเฟอร์รายการทั้งหมดเพื่อไม่ให้รายการใดสูญหาย อย่างไรก็ตาม ไม่ควรบัฟเฟอร์รายการเหล่านั้นในระดับ 'ผู้ผลิต'

ด้านล่างนี้คือสิ่งที่ฉันมีในปัจจุบัน

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

ฉันไม่แน่ใจว่าสิ่งนี้ตรงกับความต้องการของฉันจริงหรือไม่ มีหมายเหตุใน onBackpressureLatest javadoc เกี่ยวกับ observeOn ที่ฉันไม่เข้าใจ:

โปรดทราบว่าเนื่องจากลักษณะของวิธีการเผยแพร่คำขอแรงดันย้อนกลับผ่าน SubscribeOn/observeOn การร้องขอมากกว่า 1 รายการจากดาวน์สตรีมจึงไม่รับประกันว่าจะมีการส่งมอบเหตุการณ์ onNext อย่างต่อเนื่อง

และฉันมีคำถามอื่น:

  • การเรียก onBackpressureLatest ทำให้รายการไม่มัลติคาสต์อีกต่อไปหรือไม่
  • ฉันจะทดสอบความต้องการของฉันได้อย่างไร?
  • โบนัส: หากฉันมีผู้จัดพิมพ์หลายราย (ในชั้นเรียนเดียวกันหรือที่อื่น) วิธีใดคือวิธีที่ดีที่สุดที่จะทำให้รูปแบบเดียวกันนี้สามารถนำมาใช้ซ้ำได้ สร้าง Flowable ของฉันเองด้วยวิธีมอบหมาย/วิธีพิเศษหรือไม่

person Aarjav    schedule 22.05.2018    source แหล่งที่มา


คำตอบ (1)


ฉันไม่แน่ใจว่าสิ่งนี้ตรงกับความต้องการของฉันจริงหรือไม่

มันไม่ใช่. ใช้ onBackpressureLatest หรือ onBackpressureBuffer ตามด้วย observeOn ใน observeSomePacketsOn และ observeAllPacketsOn ตามลำดับ

การเรียก onBackpressureLatest ทำให้รายการไม่มัลติคาสต์อีกต่อไปหรือไม่

มัลติคาสต์เสร็จสิ้นโดย PublishProcessor และสมาชิกที่แตกต่างกันจะสร้างช่องทางแยกกันโดยที่ตัวดำเนินการ onBackpressureXXX และ observeOn มีผลกับสมาชิกแต่ละราย

ฉันจะทดสอบความต้องการของฉันได้อย่างไร?

สมัครสมาชิกผ่าน Flowable แบบ lossy หรือ lossless ด้วย TestSubscriber (Flowable.test()) ป้อนชุดแพ็คเก็ตที่รู้จักลงใน packets และดูว่าแพ็กเก็ตทั้งหมดมาถึงทาง TestSubscriber.assertValueCount() หรือ TestSubscriber.values() หรือไม่ ตัวที่สูญเสียควรเป็น 1 .. N และผู้ที่ไม่สูญเสียควรมีค่า N หลังจากช่วงผ่อนผัน

โบนัส: หากฉันมีผู้จัดพิมพ์หลายราย (ในชั้นเรียนเดียวกันหรือที่อื่น) วิธีใดคือวิธีที่ดีที่สุดที่จะทำให้รูปแบบเดียวกันนี้สามารถนำมาใช้ซ้ำได้ สร้าง Flowable ของฉันเองด้วยวิธีมอบหมาย/วิธีพิเศษหรือไม่

คุณสามารถเปลี่ยน observeAllPacketsOn เป็น FlowableTransformer และแทนที่จะใช้การเรียกเมธอดบน MyBroadcaster ให้ใช้ compose ตัวอย่างเช่น:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
person akarnokd    schedule 23.05.2018
comment
อะไรคือความแตกต่างในการทำงานระหว่างการโทร onBackPressureXXX ในวิธีการ (สำหรับสมาชิกแต่ละราย) เทียบกับครั้งเดียวและแบ่งปันกับทั้งหมด? พวกเขาเข้าไปยุ่งหรือเปล่า? ขอบคุณสำหรับคำตอบ คำถามเหล่านี้เหมาะกับปัญหา rxjava github หรือไม่ - person Aarjav; 24.05.2018