Я использую библиотеку reactive-location.
Мой вариант использования состоит в том, что у меня есть поток объектов, излучаемых наблюдаемым объектом. Эти элементы могут отправляться каждые несколько часов. Как только элемент испускается, я хочу получить местоположение и с помощью zipWith (насколько я понимаю) генерировать объект, содержащий местоположение.
Проблема в том, что поскольку объекты будут испускаться только раз в несколько часов, я не могу поддерживать наблюдаемое место в горячем состоянии, так как это разряжает батарею.
Поэтому мне нужно следующее: как только объект передается в поток, подписаться на наблюдаемое местоположение после получения местоположения, отказаться от подписки на наблюдаемое местоположение. Это нужно делать постоянно.
Насколько я понимаю, этот трансформер занимается отпиской
public <T> Observable.Transformer<T, T> takeNextAndUnsubscribe() {
return new Observable.Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> tObservable) {
final BehaviorSubject subject = BehaviorSubject.create();
Observable source = tObservable.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
subject.onNext(t);
}
});
return Observable
.merge(source.takeUntil(subject), subject)
.take(1);
}
};
}
Но как мне подписаться снова, когда новый объект будет отправлен по потоку?