Правильная обработка обратного давления и параллелизма с RxJava2

У меня есть служба, в которой я зарегистрировал обратный звонок, и теперь я хочу выставить его как Flowable с определенными требованиями / ограничениями:

  1. Обратный вызов получения потока не должен блокироваться (работа должна быть передана другому потоку / планировщику, указанному наблюдателем)
  2. Не должно быть никаких исключений из-за того, что потребители замедляют поток.
  3. Несколько потребителей могут подписаться на него независимо друг от друга.
  4. потребители могут выбрать буферизацию всех элементов, чтобы ни один из них не был потерян, однако они не должны буферизоваться в классе «производитель».

Ниже то, что у меня есть в настоящее время

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

Я не уверен, что это действительно соответствует моим требованиям. В onBackpressureLatest javadoc есть примечание относительно observeOn, которое я не понимаю:

Обратите внимание, что из-за характера того, как запросы обратного давления распространяются через subscribeOn / ObservationOn, запрос более чем 1 из нисходящего потока не гарантирует непрерывную доставку событий onNext.

И у меня есть другие вопросы:

  • Позволяет ли вызов onBackpressureLatest сделать так, чтобы элементы больше не передавались по многоадресной рассылке?
  • Как я могу проверить свои требования?
  • Бонус: если у меня несколько таких издателей (в том же классе или где-то еще), как лучше всего сделать один и тот же шаблон многоразовым? Создать свой собственный Flowable с делегированием / дополнительными методами?

person Aarjav    schedule 22.05.2018    source источник


Ответы (1)


Я не уверен, что это действительно соответствует моим требованиям.

Это не. Примените либо onBackpressureLatest, либо onBackpressureBuffer, а затем observeOn в observeSomePacketsOn и observeAllPacketsOn соответственно.

Позволяет ли вызов onBackpressureLatest исключить многоадресную рассылку элементов?

Многоадресная рассылка выполняется PublishProcessor, и разные абоненты будут устанавливать канал к нему независимо, где операторы onBackpressureXXX и observeOn действуют на индивидуальной абонентской основе.

Как я могу проверить свои требования?

Подпишитесь через Flowable с потерями или без потерь с TestSubscriber (Flowable.test()), загрузите известный набор пакетов в packets и посмотрите, все ли они прибыли через TestSubscriber.assertValueCount() или TestSubscriber.values(). Параметр с потерями должен быть 1 .. N, а параметр без потерь должен иметь N значений после периода отсрочки.

Бонус: если у меня несколько таких издателей (в том же классе или где-то еще), как лучше всего сделать один и тот же шаблон многоразовым? Создать свой собственный Flowable с делегированием / дополнительными методами?

Вы можете превратить observeAllPacketsOn в FlowableTransformer и вместо вызова метода в MyBroadcaster использовать compose, например:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
person akarnokd    schedule 23.05.2018
comment
В чем разница в поведении между вызовом onBackPressureXXX в методе (для каждого подписчика) и однократным вызовом и совместным использованием его для всех? они как-то мешают? Спасибо за ответы, такие вопросы лучше задавать по проблемам rxjava github? - person Aarjav; 24.05.2018