У меня есть служба, в которой я зарегистрировал обратный звонок, и теперь я хочу выставить его как Flowable
с определенными требованиями / ограничениями:
- Обратный вызов получения потока не должен блокироваться (работа должна быть передана другому потоку / планировщику, указанному наблюдателем)
- Не должно быть никаких исключений из-за того, что потребители замедляют поток.
- Несколько потребителей могут подписаться на него независимо друг от друга.
- потребители могут выбрать буферизацию всех элементов, чтобы ни один из них не был потерян, однако они не должны буферизоваться в классе «производитель».
Ниже то, что у меня есть в настоящее время
class MyBroadcaster {
private PublishProcessor<Packet> packets = PublishProcessor.create();
private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();
public MyBroadcaster() {
//this is actually different to my exact use but same conceptually
registerCallback(packets::onNext);
}
public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
return backpressuredPackets.observeOn(scheduler);
}
}
Я не уверен, что это действительно соответствует моим требованиям. В onBackpressureLatest
javadoc есть примечание относительно observeOn
, которое я не понимаю:
Обратите внимание, что из-за характера того, как запросы обратного давления распространяются через subscribeOn / ObservationOn, запрос более чем 1 из нисходящего потока не гарантирует непрерывную доставку событий onNext.
И у меня есть другие вопросы:
- Позволяет ли вызов
onBackpressureLatest
сделать так, чтобы элементы больше не передавались по многоадресной рассылке? - Как я могу проверить свои требования?
- Бонус: если у меня несколько таких издателей (в том же классе или где-то еще), как лучше всего сделать один и тот же шаблон многоразовым? Создать свой собственный Flowable с делегированием / дополнительными методами?